From d63d2a8a2644a18c27b8d6cad4a23b0d11736852 Mon Sep 17 00:00:00 2001 From: Advait Shinde Date: Fri, 4 Mar 2016 22:52:08 +0000 Subject: [PATCH 1/2] Support libkv.WatchTree chan errors: - libkv.WatchTree returns a channel whose messages represent changes to the watched tree. In situations where libkv cannot read from the underlying store, libkv will close the provided channel. - This PR handles this edge case and fixes #238. --- provider/kv.go | 40 +++++++++++++++++--------------- provider/kv_test.go | 56 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 75 insertions(+), 21 deletions(-) diff --git a/provider/kv.go b/provider/kv.go index a509ec0a6..122b150cd 100644 --- a/provider/kv.go +++ b/provider/kv.go @@ -35,6 +35,27 @@ type KvTLS struct { InsecureSkipVerify bool } +func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string) { + for { + chanKeys, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}) /* stop chan */) + if err != nil { + log.Errorf("Failed to WatchTree %s", err) + continue + } + + for range chanKeys { + configuration := provider.loadConfig() + if configuration != nil { + configurationChan <- types.ConfigMessage{ + ProviderName: string(provider.storeType), + Configuration: configuration, + } + } + } + log.Warnf("Intermittent failure to WatchTree KV. Retrying.") + } +} + func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error { storeConfig := &store.Config{ ConnectionTimeout: 30 * time.Second, @@ -80,24 +101,7 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error } provider.kvclient = kv if provider.Watch { - stopCh := make(chan struct{}) - chanKeys, err := kv.WatchTree(provider.Prefix, stopCh) - if err != nil { - return err - } - go func() { - for { - <-chanKeys - configuration := provider.loadConfig() - if configuration != nil { - configurationChan <- types.ConfigMessage{ - ProviderName: string(provider.storeType), - Configuration: configuration, - } - } - defer close(stopCh) - } - }() + go provider.watchKv(configurationChan, provider.Prefix) } configuration := provider.loadConfig() configurationChan <- types.ConfigMessage{ diff --git a/provider/kv_test.go b/provider/kv_test.go index d879ccfa4..c394dfd8d 100644 --- a/provider/kv_test.go +++ b/provider/kv_test.go @@ -2,8 +2,10 @@ package provider import ( "errors" + "github.com/containous/traefik/types" "strings" "testing" + "time" "github.com/docker/libkv/store" "reflect" @@ -231,10 +233,58 @@ func TestKvLast(t *testing.T) { } } +type KvMock struct { + Kv +} + +func (provider *KvMock) loadConfig() *types.Configuration { + return nil +} + +func TestKvWatchTree(t *testing.T) { + returnedChans := make(chan chan []*store.KVPair) + provider := &KvMock{ + Kv{ + kvclient: &Mock{ + WatchTreeMethod: func() <-chan []*store.KVPair { + c := make(chan []*store.KVPair, 10) + returnedChans <- c + return c + }, + }, + }, + } + + configChan := make(chan types.ConfigMessage) + go provider.watchKv(configChan, "prefix") + + select { + case c1 := <-returnedChans: + c1 <- []*store.KVPair{} + <-configChan + close(c1) // WatchTree chans can close due to error + case <-time.After(1 * time.Second): + } + + select { + case c2 := <-returnedChans: + c2 <- []*store.KVPair{} + <-configChan + case <-time.After(1 * time.Second): + } + + select { + case _ = <-configChan: + t.Fatalf("configChan should be empty") + default: + } +} + // Extremely limited mock store so we can test initialization type Mock struct { - Error bool - KVPairs []*store.KVPair + Error bool + KVPairs []*store.KVPair + WatchTreeMethod func() <-chan []*store.KVPair } func (s *Mock) Put(key string, value []byte, opts *store.WriteOptions) error { @@ -269,7 +319,7 @@ func (s *Mock) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, // WatchTree mock func (s *Mock) WatchTree(prefix string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - return nil, errors.New("WatchTree not supported") + return s.WatchTreeMethod(), nil } // NewLock mock From 7674a828013e8e62ea91892ef3146e2b9b487357 Mon Sep 17 00:00:00 2001 From: Advait Shinde Date: Sat, 5 Mar 2016 20:43:44 +0000 Subject: [PATCH 2/2] Fatalf for timeout cases. --- provider/kv_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/provider/kv_test.go b/provider/kv_test.go index c394dfd8d..a2438c344 100644 --- a/provider/kv_test.go +++ b/provider/kv_test.go @@ -264,6 +264,7 @@ func TestKvWatchTree(t *testing.T) { <-configChan close(c1) // WatchTree chans can close due to error case <-time.After(1 * time.Second): + t.Fatalf("Failed to create a new WatchTree chan") } select { @@ -271,6 +272,7 @@ func TestKvWatchTree(t *testing.T) { c2 <- []*store.KVPair{} <-configChan case <-time.After(1 * time.Second): + t.Fatalf("Failed to create a new WatchTree chan") } select {