From 4d71f682b3dde7d9f3aa4d3ff394037cc5ca5ae4 Mon Sep 17 00:00:00 2001 From: Kevin Pollet Date: Thu, 2 Jul 2020 11:18:04 +0200 Subject: [PATCH] Fix race condition issues with provided dynamic configuration * tests: add tests to show race condition on provider config * fix: store a deep copy of previous provider config * fix: send a deep copy of provdier config to watcher listener --- pkg/server/configurationwatcher.go | 4 +- pkg/server/configurationwatcher_test.go | 93 +++++++++++++++++++++++++ 2 files changed, 95 insertions(+), 2 deletions(-) diff --git a/pkg/server/configurationwatcher.go b/pkg/server/configurationwatcher.go index eb3f56f67..0fcd1c204 100644 --- a/pkg/server/configurationwatcher.go +++ b/pkg/server/configurationwatcher.go @@ -224,8 +224,8 @@ func (c *ConfigurationWatcher) throttleProviderConfigReload(ctx context.Context, logger.Info("Skipping same configuration") continue } - previousConfig = nextConfig - ring.In() <- nextConfig + previousConfig = *nextConfig.DeepCopy() + ring.In() <- *nextConfig.DeepCopy() } } } diff --git a/pkg/server/configurationwatcher_test.go b/pkg/server/configurationwatcher_test.go index 78bab9769..2078cc032 100644 --- a/pkg/server/configurationwatcher_test.go +++ b/pkg/server/configurationwatcher_test.go @@ -301,3 +301,96 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) { assert.Equal(t, expected, publishedProviderConfig) } + +func TestPublishConfigUpdatedByProvider(t *testing.T) { + routinesPool := safe.NewPool(context.Background()) + + pvdConfiguration := dynamic.Configuration{ + TCP: &dynamic.TCPConfiguration{ + Routers: map[string]*dynamic.TCPRouter{ + "foo": {}, + }, + }, + } + + pvd := &mockProvider{ + wait: 10 * time.Millisecond, + messages: []dynamic.Message{ + { + ProviderName: "mock", + Configuration: &pvdConfiguration, + }, + { + ProviderName: "mock", + Configuration: &pvdConfiguration, + }, + }, + } + + watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}) + + publishedConfigCount := 0 + watcher.AddListener(func(configuration dynamic.Configuration) { + publishedConfigCount++ + + // Update the provider configuration published in next dynamic Message which should trigger a new publish. + pvdConfiguration.TCP.Routers["bar"] = &dynamic.TCPRouter{} + }) + + watcher.Start() + defer watcher.Stop() + + // give some time so that the configuration can be processed. + time.Sleep(100 * time.Millisecond) + + assert.Equal(t, 2, publishedConfigCount) +} + +func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) { + routinesPool := safe.NewPool(context.Background()) + + pvd := &mockProvider{ + wait: 10 * time.Millisecond, + messages: []dynamic.Message{ + { + ProviderName: "mock", + Configuration: &dynamic.Configuration{ + TCP: &dynamic.TCPConfiguration{ + Routers: map[string]*dynamic.TCPRouter{ + "foo": {}, + }, + }, + }, + }, + { + ProviderName: "mock", + Configuration: &dynamic.Configuration{ + TCP: &dynamic.TCPConfiguration{ + Routers: map[string]*dynamic.TCPRouter{ + "foo": {}, + }, + }, + }, + }, + }, + } + + watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}) + + publishedConfigCount := 0 + watcher.AddListener(func(configuration dynamic.Configuration) { + publishedConfigCount++ + + // Modify the provided configuration. This should not modify the configuration stored in the configuration + // watcher and cause a new publish. + configuration.TCP.Routers["foo@mock"].Rule = "bar" + }) + + watcher.Start() + defer watcher.Stop() + + // give some time so that the configuration can be processed. + time.Sleep(100 * time.Millisecond) + + assert.Equal(t, 1, publishedConfigCount) +}