Merge pull request #324 from containous/fix-kv-backend

Fix KV backend
This commit is contained in:
Vincent Demeester 2016-04-20 08:01:32 +02:00
commit b3b658a955
2 changed files with 59 additions and 36 deletions

View file

@ -17,11 +17,9 @@ curl -i -H "Accept: application/json" -X PUT -d "2" ht
# frontend 1
curl -i -H "Accept: application/json" -X PUT -d "backend2" http://localhost:8500/v1/kv/traefik/frontends/frontend1/backend
curl -i -H "Accept: application/json" -X PUT -d "http" http://localhost:8500/v1/kv/traefik/frontends/frontend1/entrypoints
curl -i -H "Accept: application/json" -X PUT -d "Host" http://localhost:8500/v1/kv/traefik/frontends/frontend1/routes/test_1/rule
curl -i -H "Accept: application/json" -X PUT -d "test.localhost" http://localhost:8500/v1/kv/traefik/frontends/frontend1/routes/test_1/value
curl -i -H "Accept: application/json" -X PUT -d "Host:test.localhost" http://localhost:8500/v1/kv/traefik/frontends/frontend1/routes/test_1/rule
# frontend 2
curl -i -H "Accept: application/json" -X PUT -d "backend1" http://localhost:8500/v1/kv/traefik/frontends/frontend2/backend
curl -i -H "Accept: application/json" -X PUT -d "http,https" http://localhost:8500/v1/kv/traefik/frontends/frontend2/entrypoints
curl -i -H "Accept: application/json" -X PUT -d "Path" http://localhost:8500/v1/kv/traefik/frontends/frontend2/routes/test_2/rule
curl -i -H "Accept: application/json" -X PUT -d "/test" http://localhost:8500/v1/kv/traefik/frontends/frontend2/routes/test_2/value
curl -i -H "Accept: application/json" -X PUT -d "http" http://localhost:8500/v1/kv/traefik/frontends/frontend2/entrypoints
curl -i -H "Accept: application/json" -X PUT -d "Path:/test" http://localhost:8500/v1/kv/traefik/frontends/frontend2/routes/test_2/rule

View file

@ -10,8 +10,10 @@ import (
"text/template"
"time"
"errors"
"github.com/BurntSushi/ty/fun"
log "github.com/Sirupsen/logrus"
"github.com/cenkalti/backoff"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/docker/libkv"
@ -37,25 +39,38 @@ type KvTLS struct {
}
func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) {
for {
operation := func() error {
events, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}) /* stop chan */)
if err != nil {
log.Errorf("Failed to WatchTree %s", err)
continue
return err
}
select {
case <-stop:
return
case <-events:
configuration := provider.loadConfig()
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: string(provider.storeType),
Configuration: configuration,
for {
select {
case <-stop:
return nil
case _, ok := <-events:
if !ok {
return errors.New("watchtree channel closed")
}
configuration := provider.loadConfig()
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: string(provider.storeType),
Configuration: configuration,
}
}
}
}
}
notify := func(err error, time time.Duration) {
log.Errorf("KV connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
if err != nil {
log.Fatalf("Cannot connect to KV server %+v", err)
}
}
func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
@ -90,27 +105,37 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *
}
}
kv, err := libkv.NewStore(
provider.storeType,
strings.Split(provider.Endpoint, ","),
storeConfig,
)
operation := func() error {
kv, err := libkv.NewStore(
provider.storeType,
strings.Split(provider.Endpoint, ","),
storeConfig,
)
if err != nil {
return err
}
if _, err := kv.List(""); err != nil {
return err
}
provider.kvclient = kv
if provider.Watch {
pool.Go(func(stop chan bool) {
provider.watchKv(configurationChan, provider.Prefix, stop)
})
}
configuration := provider.loadConfig()
configurationChan <- types.ConfigMessage{
ProviderName: string(provider.storeType),
Configuration: configuration,
}
return nil
}
notify := func(err error, time time.Duration) {
log.Errorf("KV connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
if err != nil {
return err
}
if _, err := kv.List(""); err != nil {
return err
}
provider.kvclient = kv
if provider.Watch {
pool.Go(func(stop chan bool) {
provider.watchKv(configurationChan, provider.Prefix, stop)
})
}
configuration := provider.loadConfig()
configurationChan <- types.ConfigMessage{
ProviderName: string(provider.storeType),
Configuration: configuration,
log.Fatalf("Cannot connect to KV server %+v", err)
}
return nil
}