From 06d2f343dde940b1b3f29fca568cc76870503757 Mon Sep 17 00:00:00 2001 From: Emile Vauge Date: Tue, 19 Apr 2016 22:06:33 +0200 Subject: [PATCH] Fix KV backend Signed-off-by: Emile Vauge --- examples/consul-config.sh | 8 ++-- provider/kv.go | 87 +++++++++++++++++++++++++-------------- 2 files changed, 59 insertions(+), 36 deletions(-) diff --git a/examples/consul-config.sh b/examples/consul-config.sh index 58952031e..e86c68201 100755 --- a/examples/consul-config.sh +++ b/examples/consul-config.sh @@ -17,11 +17,9 @@ curl -i -H "Accept: application/json" -X PUT -d "2" ht # frontend 1 curl -i -H "Accept: application/json" -X PUT -d "backend2" http://localhost:8500/v1/kv/traefik/frontends/frontend1/backend curl -i -H "Accept: application/json" -X PUT -d "http" http://localhost:8500/v1/kv/traefik/frontends/frontend1/entrypoints -curl -i -H "Accept: application/json" -X PUT -d "Host" http://localhost:8500/v1/kv/traefik/frontends/frontend1/routes/test_1/rule -curl -i -H "Accept: application/json" -X PUT -d "test.localhost" http://localhost:8500/v1/kv/traefik/frontends/frontend1/routes/test_1/value +curl -i -H "Accept: application/json" -X PUT -d "Host:test.localhost" http://localhost:8500/v1/kv/traefik/frontends/frontend1/routes/test_1/rule # frontend 2 curl -i -H "Accept: application/json" -X PUT -d "backend1" http://localhost:8500/v1/kv/traefik/frontends/frontend2/backend -curl -i -H "Accept: application/json" -X PUT -d "http,https" http://localhost:8500/v1/kv/traefik/frontends/frontend2/entrypoints -curl -i -H "Accept: application/json" -X PUT -d "Path" http://localhost:8500/v1/kv/traefik/frontends/frontend2/routes/test_2/rule -curl -i -H "Accept: application/json" -X PUT -d "/test" http://localhost:8500/v1/kv/traefik/frontends/frontend2/routes/test_2/value +curl -i -H "Accept: application/json" -X PUT -d "http" http://localhost:8500/v1/kv/traefik/frontends/frontend2/entrypoints +curl -i -H "Accept: application/json" -X PUT -d "Path:/test" http://localhost:8500/v1/kv/traefik/frontends/frontend2/routes/test_2/rule diff --git a/provider/kv.go b/provider/kv.go index aacddffd8..ebf55bce6 100644 --- a/provider/kv.go +++ b/provider/kv.go @@ -10,8 +10,10 @@ import ( "text/template" "time" + "errors" "github.com/BurntSushi/ty/fun" log "github.com/Sirupsen/logrus" + "github.com/cenkalti/backoff" "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/docker/libkv" @@ -37,25 +39,38 @@ type KvTLS struct { } func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) { - for { + operation := func() error { events, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}) /* stop chan */) if err != nil { log.Errorf("Failed to WatchTree %s", err) - continue + return err } - select { - case <-stop: - return - case <-events: - configuration := provider.loadConfig() - if configuration != nil { - configurationChan <- types.ConfigMessage{ - ProviderName: string(provider.storeType), - Configuration: configuration, + for { + select { + case <-stop: + return nil + case _, ok := <-events: + if !ok { + return errors.New("watchtree channel closed") + } + configuration := provider.loadConfig() + if configuration != nil { + configurationChan <- types.ConfigMessage{ + ProviderName: string(provider.storeType), + Configuration: configuration, + } } } } } + + notify := func(err error, time time.Duration) { + log.Errorf("KV connection error %+v, retrying in %s", err, time) + } + err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) + if err != nil { + log.Fatalf("Cannot connect to KV server %+v", err) + } } func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error { @@ -90,27 +105,37 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool * } } - kv, err := libkv.NewStore( - provider.storeType, - strings.Split(provider.Endpoint, ","), - storeConfig, - ) + operation := func() error { + kv, err := libkv.NewStore( + provider.storeType, + strings.Split(provider.Endpoint, ","), + storeConfig, + ) + if err != nil { + return err + } + if _, err := kv.List(""); err != nil { + return err + } + provider.kvclient = kv + if provider.Watch { + pool.Go(func(stop chan bool) { + provider.watchKv(configurationChan, provider.Prefix, stop) + }) + } + configuration := provider.loadConfig() + configurationChan <- types.ConfigMessage{ + ProviderName: string(provider.storeType), + Configuration: configuration, + } + return nil + } + notify := func(err error, time time.Duration) { + log.Errorf("KV connection error %+v, retrying in %s", err, time) + } + err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify) if err != nil { - return err - } - if _, err := kv.List(""); err != nil { - return err - } - provider.kvclient = kv - if provider.Watch { - pool.Go(func(stop chan bool) { - provider.watchKv(configurationChan, provider.Prefix, stop) - }) - } - configuration := provider.loadConfig() - configurationChan <- types.ConfigMessage{ - ProviderName: string(provider.storeType), - Configuration: configuration, + log.Fatalf("Cannot connect to KV server %+v", err) } return nil }