diff --git a/cmd/traefik/traefik.go b/cmd/traefik/traefik.go index 91a6d4d74..eb57fad52 100644 --- a/cmd/traefik/traefik.go +++ b/cmd/traefik/traefik.go @@ -180,8 +180,7 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err tlsManager := traefiktls.NewManager() httpChallengeProvider := acme.NewChallengeHTTP() - // we need to wait at least 2 times the ProvidersThrottleDuration to be sure to handle the challenge. - tlsChallengeProvider := acme.NewChallengeTLSALPN(time.Duration(staticConfiguration.Providers.ProvidersThrottleDuration) * 2) + tlsChallengeProvider := acme.NewChallengeTLSALPN() err = providerAggregator.AddProvider(tlsChallengeProvider) if err != nil { return nil, err @@ -265,7 +264,6 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err watcher := server.NewConfigurationWatcher( routinesPool, providerAggregator, - time.Duration(staticConfiguration.Providers.ProvidersThrottleDuration), getDefaultsEntrypoints(staticConfiguration), "internal", ) diff --git a/go.mod b/go.mod index 0a3120aba..4c1e9e998 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,6 @@ require ( github.com/docker/docker v20.10.7+incompatible github.com/docker/go-connections v0.4.0 github.com/donovanhide/eventsource v0.0.0-20170630084216-b8f31a59085e // indirect - github.com/eapache/channels v1.1.0 github.com/fatih/structs v1.1.0 github.com/gambol99/go-marathon v0.0.0-20180614232016-99a156b96fb2 github.com/go-acme/lego/v4 v4.6.0 diff --git a/go.sum b/go.sum index d056ed2bc..750bc5a7f 100644 --- a/go.sum +++ b/go.sum @@ -550,8 +550,6 @@ github.com/donovanhide/eventsource v0.0.0-20170630084216-b8f31a59085e h1:rMOGp6H github.com/donovanhide/eventsource v0.0.0-20170630084216-b8f31a59085e/go.mod h1:56wL82FO0bfMU5RvfXoIwSOP2ggqqxT+tAfNEIyxuHw= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/eapache/channels v1.1.0 h1:F1taHcn7/F0i8DYqKXJnyhJcVpp2kgFcNePxXtnyu4k= -github.com/eapache/channels v1.1.0/go.mod h1:jMm2qB5Ubtg9zLd+inMZd2/NUvXgzmWXsDaLyQIGfH0= github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= diff --git a/integration/acme_test.go b/integration/acme_test.go index 116a8788f..16b0b8398 100644 --- a/integration/acme_test.go +++ b/integration/acme_test.go @@ -446,26 +446,28 @@ func (s *AcmeSuite) retrieveAcmeCertificate(c *check.C, testCase acmeTestCase) { backend := startTestServer("9010", http.StatusOK, "") defer backend.Close() + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } + + // wait for traefik (generating acme account take some seconds) + err = try.Do(60*time.Second, func() error { + _, errGet := client.Get("https://127.0.0.1:5001") + return errGet + }) + c.Assert(err, checker.IsNil) + for _, sub := range testCase.subCases { - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - }, - } - - // wait for traefik (generating acme account take some seconds) - err = try.Do(60*time.Second, func() error { - _, errGet := client.Get("https://127.0.0.1:5001") - return errGet - }) - c.Assert(err, checker.IsNil) - client = &http.Client{ Transport: &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, ServerName: sub.host, }, + // Needed so that each subcase redoes the SSL handshake + DisableKeepAlives: true, }, } @@ -479,10 +481,6 @@ func (s *AcmeSuite) retrieveAcmeCertificate(c *check.C, testCase acmeTestCase) { // Retry to send a Request which uses the LE generated certificate err = try.Do(60*time.Second, func() error { resp, err = client.Do(req) - - // /!\ If connection is not closed, SSLHandshake will only be done during the first trial /!\ - req.Close = true - if err != nil { return err } diff --git a/integration/conf_throttling_test.go b/integration/conf_throttling_test.go new file mode 100644 index 000000000..6c4333b35 --- /dev/null +++ b/integration/conf_throttling_test.go @@ -0,0 +1,105 @@ +package integration + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "regexp" + "strconv" + "time" + + "github.com/go-check/check" + "github.com/traefik/traefik/v2/integration/try" + "github.com/traefik/traefik/v2/pkg/config/dynamic" + checker "github.com/vdemeester/shakers" +) + +type ThrottlingSuite struct{ BaseSuite } + +func (s *ThrottlingSuite) SetUpSuite(c *check.C) { + s.createComposeProject(c, "rest") + s.composeUp(c) +} + +func (s *ThrottlingSuite) TestThrottleConfReload(c *check.C) { + cmd, display := s.traefikCmd(withConfigFile("fixtures/throttling/simple.toml")) + + defer display(c) + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer s.killCmd(cmd) + + // wait for Traefik + err = try.GetRequest("http://127.0.0.1:8080/api/rawdata", 1000*time.Millisecond, try.BodyContains("rest@internal")) + c.Assert(err, checker.IsNil) + + // Expected a 404 as we did not configure anything. + err = try.GetRequest("http://127.0.0.1:8000/", 1000*time.Millisecond, try.StatusCodeIs(http.StatusNotFound)) + c.Assert(err, checker.IsNil) + + config := &dynamic.Configuration{ + HTTP: &dynamic.HTTPConfiguration{ + Routers: map[string]*dynamic.Router{}, + Services: map[string]*dynamic.Service{ + "serviceHTTP": { + LoadBalancer: &dynamic.ServersLoadBalancer{ + Servers: []dynamic.Server{ + { + URL: "http://" + s.getComposeServiceIP(c, "whoami1") + ":80", + }, + }, + }, + }, + }, + }, + } + + router := &dynamic.Router{ + EntryPoints: []string{"web"}, + Middlewares: []string{}, + Service: "serviceHTTP", + Rule: "PathPrefix(`/`)", + } + + confChanges := 10 + + for i := 0; i < confChanges; i++ { + config.HTTP.Routers[fmt.Sprintf("routerHTTP%d", i)] = router + data, err := json.Marshal(config) + c.Assert(err, checker.IsNil) + + request, err := http.NewRequest(http.MethodPut, "http://127.0.0.1:8080/api/providers/rest", bytes.NewReader(data)) + c.Assert(err, checker.IsNil) + + response, err := http.DefaultClient.Do(request) + c.Assert(err, checker.IsNil) + c.Assert(response.StatusCode, checker.Equals, http.StatusOK) + time.Sleep(200 * time.Millisecond) + } + + reloadsRegexp := regexp.MustCompile(`traefik_config_reloads_total (\d*)\n`) + + resp, err := http.Get("http://127.0.0.1:8080/metrics") + c.Assert(err, checker.IsNil) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + c.Assert(err, checker.IsNil) + + fields := reloadsRegexp.FindStringSubmatch(string(body)) + c.Assert(len(fields), checker.Equals, 2) + + reloads, err := strconv.Atoi(fields[1]) + if err != nil { + panic(err) + } + + // The test tries to trigger a config reload with the REST API every 200ms, + // 10 times (so for 2s in total). + // Therefore the throttling (set at 400ms for this test) should only let + // (2s / 400 ms =) 5 config reloads happen in theory. + // In addition, we have to take into account the extra config reload from the internal provider (5 + 1). + c.Assert(reloads, checker.LessOrEqualThan, 6) +} diff --git a/integration/fixtures/throttling/simple.toml b/integration/fixtures/throttling/simple.toml new file mode 100644 index 000000000..889a35d09 --- /dev/null +++ b/integration/fixtures/throttling/simple.toml @@ -0,0 +1,22 @@ +[global] + checkNewVersion = false + sendAnonymousUsage = false + +[log] + level = "DEBUG" + +[entryPoints] + [entryPoints.web] + address = ":8000" + +[api] + insecure = true + +[providers] + providersThrottleDuration = "400ms" + [providers.rest] + insecure = true + +[metrics] + [metrics.prometheus] + buckets = [0.1,0.3,1.2,5.0] diff --git a/integration/integration_test.go b/integration/integration_test.go index 646ec2d08..c7e99d916 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -68,6 +68,7 @@ func Test(t *testing.T) { check.Suite(&SimpleSuite{}) check.Suite(&TCPSuite{}) check.Suite(&TimeoutSuite{}) + check.Suite(&ThrottlingSuite{}) check.Suite(&TLSClientHeadersSuite{}) check.Suite(&TracingSuite{}) check.Suite(&UDPSuite{}) diff --git a/pkg/provider/acme/challenge_tls.go b/pkg/provider/acme/challenge_tls.go index b7f97516d..6249bc52e 100644 --- a/pkg/provider/acme/challenge_tls.go +++ b/pkg/provider/acme/challenge_tls.go @@ -17,8 +17,6 @@ const providerNameALPN = "tlsalpn.acme" // ChallengeTLSALPN TLSALPN challenge provider implements challenge.Provider. type ChallengeTLSALPN struct { - Timeout time.Duration - chans map[string]chan struct{} muChans sync.Mutex @@ -29,11 +27,10 @@ type ChallengeTLSALPN struct { } // NewChallengeTLSALPN creates a new ChallengeTLSALPN. -func NewChallengeTLSALPN(timeout time.Duration) *ChallengeTLSALPN { +func NewChallengeTLSALPN() *ChallengeTLSALPN { return &ChallengeTLSALPN{ - Timeout: timeout, - chans: make(map[string]chan struct{}), - certs: make(map[string]*Certificate), + chans: make(map[string]chan struct{}), + certs: make(map[string]*Certificate), } } @@ -61,12 +58,13 @@ func (c *ChallengeTLSALPN) Present(domain, _, keyAuth string) error { c.configurationChan <- conf - timer := time.NewTimer(c.Timeout) + // Present should return when its dynamic configuration has been received and applied by Traefik. + // The timer exists in case the above does not happen, to ensure the challenge cleanup. + timer := time.NewTimer(time.Minute) + defer timer.Stop() select { case t := <-timer.C: - timer.Stop() - c.muChans.Lock() c.cleanChan(string(certPEMBlock)) c.muChans.Unlock() @@ -103,6 +101,11 @@ func (c *ChallengeTLSALPN) Init() error { return nil } +// ThrottleDuration returns the throttle duration. +func (c *ChallengeTLSALPN) ThrottleDuration() time.Duration { + return 0 +} + // Provide allows the provider to provide configurations to traefik using the given configuration channel. func (c *ChallengeTLSALPN) Provide(configurationChan chan<- dynamic.Message, _ *safe.Pool) error { c.configurationChan = configurationChan diff --git a/pkg/provider/acme/provider.go b/pkg/provider/acme/provider.go index bbc95d516..6a8eda985 100644 --- a/pkg/provider/acme/provider.go +++ b/pkg/provider/acme/provider.go @@ -180,6 +180,11 @@ func isAccountMatchingCaServer(ctx context.Context, accountURI, serverURI string return cau.Hostname() == aru.Hostname() } +// ThrottleDuration returns the throttle duration. +func (p *Provider) ThrottleDuration() time.Duration { + return 0 +} + // Provide allows the file provider to provide configurations to traefik // using the given Configuration channel. func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error { diff --git a/pkg/provider/aggregator/aggregator.go b/pkg/provider/aggregator/aggregator.go index 0d58f76af..21109a184 100644 --- a/pkg/provider/aggregator/aggregator.go +++ b/pkg/provider/aggregator/aggregator.go @@ -1,6 +1,9 @@ package aggregator import ( + "context" + "time" + "github.com/traefik/traefik/v2/pkg/config/dynamic" "github.com/traefik/traefik/v2/pkg/config/static" "github.com/traefik/traefik/v2/pkg/log" @@ -11,16 +14,63 @@ import ( "github.com/traefik/traefik/v2/pkg/safe" ) +// throttled defines what kind of config refresh throttling the aggregator should +// set up for a given provider. +// If a provider implements throttled, the configuration changes it sends will be +// taken into account no more often than the frequency inferred from ThrottleDuration(). +// If ThrottleDuration returns zero, no throttling will take place. +// If throttled is not implemented, the throttling will be set up in accordance +// with the global providersThrottleDuration option. +type throttled interface { + ThrottleDuration() time.Duration +} + +// maybeThrottledProvide returns the Provide method of the given provider, +// potentially augmented with some throttling depending on whether and how the +// provider implements the throttled interface. +func maybeThrottledProvide(prd provider.Provider, defaultDuration time.Duration) func(chan<- dynamic.Message, *safe.Pool) error { + providerThrottleDuration := defaultDuration + if throttled, ok := prd.(throttled); ok { + // per-provider throttling + providerThrottleDuration = throttled.ThrottleDuration() + } + + if providerThrottleDuration == 0 { + // throttling disabled + return prd.Provide + } + + return func(configurationChan chan<- dynamic.Message, pool *safe.Pool) error { + rc := newRingChannel() + pool.GoCtx(func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case msg := <-rc.out(): + configurationChan <- msg + time.Sleep(providerThrottleDuration) + } + } + }) + + return prd.Provide(rc.in(), pool) + } +} + // ProviderAggregator aggregates providers. type ProviderAggregator struct { - internalProvider provider.Provider - fileProvider provider.Provider - providers []provider.Provider + internalProvider provider.Provider + fileProvider provider.Provider + providers []provider.Provider + providersThrottleDuration time.Duration } // NewProviderAggregator returns an aggregate of all the providers configured in the static configuration. func NewProviderAggregator(conf static.Providers) ProviderAggregator { - p := ProviderAggregator{} + p := ProviderAggregator{ + providersThrottleDuration: time.Duration(conf.ProvidersThrottleDuration), + } if conf.File != nil { p.quietAddProvider(conf.File) @@ -119,26 +169,26 @@ func (p ProviderAggregator) Init() error { // Provide calls the provide method of every providers. func (p ProviderAggregator) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error { if p.fileProvider != nil { - launchProvider(configurationChan, pool, p.fileProvider) + p.launchProvider(configurationChan, pool, p.fileProvider) } for _, prd := range p.providers { prd := prd safe.Go(func() { - launchProvider(configurationChan, pool, prd) + p.launchProvider(configurationChan, pool, prd) }) } // internal provider must be the last because we use it to know if all the providers are loaded. // ConfigurationWatcher will wait for this requiredProvider before applying configurations. if p.internalProvider != nil { - launchProvider(configurationChan, pool, p.internalProvider) + p.launchProvider(configurationChan, pool, p.internalProvider) } return nil } -func launchProvider(configurationChan chan<- dynamic.Message, pool *safe.Pool, prd provider.Provider) { +func (p ProviderAggregator) launchProvider(configurationChan chan<- dynamic.Message, pool *safe.Pool, prd provider.Provider) { jsonConf, err := redactor.RemoveCredentials(prd) if err != nil { log.WithoutContext().Debugf("Cannot marshal the provider configuration %T: %v", prd, err) @@ -147,9 +197,8 @@ func launchProvider(configurationChan chan<- dynamic.Message, pool *safe.Pool, p log.WithoutContext().Infof("Starting provider %T", prd) log.WithoutContext().Debugf("%T provider configuration: %s", prd, jsonConf) - currentProvider := prd - err = currentProvider.Provide(configurationChan, pool) - if err != nil { + if err := maybeThrottledProvide(prd, p.providersThrottleDuration)(configurationChan, pool); err != nil { log.WithoutContext().Errorf("Cannot start the provider %T: %v", prd, err) + return } } diff --git a/pkg/provider/aggregator/ring_channel.go b/pkg/provider/aggregator/ring_channel.go new file mode 100644 index 000000000..e69328da1 --- /dev/null +++ b/pkg/provider/aggregator/ring_channel.go @@ -0,0 +1,71 @@ +package aggregator + +import ( + "github.com/traefik/traefik/v2/pkg/config/dynamic" +) + +// RingChannel implements a channel in a way that never blocks the writer. +// Specifically, if a value is written to a RingChannel when its buffer is full then the oldest +// value in the buffer is discarded to make room (just like a standard ring-buffer). +// Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling +// the writer before the reader, so caveat emptor. +type RingChannel struct { + input, output chan dynamic.Message + buffer *dynamic.Message +} + +func newRingChannel() *RingChannel { + ch := &RingChannel{ + input: make(chan dynamic.Message), + output: make(chan dynamic.Message), + } + go ch.ringBuffer() + return ch +} + +func (ch *RingChannel) in() chan<- dynamic.Message { + return ch.input +} + +func (ch *RingChannel) out() <-chan dynamic.Message { + return ch.output +} + +// for all buffered cases. +func (ch *RingChannel) ringBuffer() { + var input, output chan dynamic.Message + var next dynamic.Message + input = ch.input + + for input != nil || output != nil { + select { + // Prefer to write if possible, which is surprisingly effective in reducing + // dropped elements due to overflow. The naive read/write select chooses randomly + // when both channels are ready, which produces unnecessary drops 50% of the time. + case output <- next: + ch.buffer = nil + default: + select { + case elem, open := <-input: + if !open { + input = nil + break + } + + ch.buffer = &elem + case output <- next: + ch.buffer = nil + } + } + + if ch.buffer == nil { + output = nil + continue + } + + output = ch.output + next = *ch.buffer + } + + close(ch.output) +} diff --git a/pkg/provider/traefik/internal.go b/pkg/provider/traefik/internal.go index 322d67780..47d2c3023 100644 --- a/pkg/provider/traefik/internal.go +++ b/pkg/provider/traefik/internal.go @@ -6,6 +6,7 @@ import ( "math" "net" "regexp" + "time" "github.com/traefik/traefik/v2/pkg/config/dynamic" "github.com/traefik/traefik/v2/pkg/config/static" @@ -29,6 +30,11 @@ func New(staticCfg static.Configuration) *Provider { return &Provider{staticCfg: staticCfg} } +// ThrottleDuration returns the throttle duration. +func (i Provider) ThrottleDuration() time.Duration { + return 0 +} + // Provide allows the provider to provide configurations to traefik using the given configuration channel. func (i *Provider) Provide(configurationChan chan<- dynamic.Message, _ *safe.Pool) error { ctx := log.With(context.Background(), log.Str(log.ProviderName, "internal")) diff --git a/pkg/server/aggregator.go b/pkg/server/aggregator.go index 33c11e00f..4b50829e8 100644 --- a/pkg/server/aggregator.go +++ b/pkg/server/aggregator.go @@ -9,6 +9,8 @@ import ( ) func mergeConfiguration(configurations dynamic.Configurations, defaultEntryPoints []string) dynamic.Configuration { + // TODO: see if we can use DeepCopies inside, so that the given argument is left + // untouched, and the modified copy is returned. conf := dynamic.Configuration{ HTTP: &dynamic.HTTPConfiguration{ Routers: make(map[string]*dynamic.Router), diff --git a/pkg/server/configurationwatcher.go b/pkg/server/configurationwatcher.go index f302d7571..4d6798694 100644 --- a/pkg/server/configurationwatcher.go +++ b/pkg/server/configurationwatcher.go @@ -4,9 +4,7 @@ import ( "context" "encoding/json" "reflect" - "time" - "github.com/eapache/channels" "github.com/sirupsen/logrus" "github.com/traefik/traefik/v2/pkg/config/dynamic" "github.com/traefik/traefik/v2/pkg/log" @@ -17,17 +15,13 @@ import ( // ConfigurationWatcher watches configuration changes. type ConfigurationWatcher struct { - provider provider.Provider + providerAggregator provider.Provider defaultEntryPoints []string - providersThrottleDuration time.Duration + allProvidersConfigs chan dynamic.Message - currentConfigurations safe.Safe - - configurationChan chan dynamic.Message - configurationValidatedChan chan dynamic.Message - providerConfigUpdateMap map[string]chan dynamic.Message + newConfigs chan dynamic.Configurations requiredProvider string configurationListeners []func(dynamic.Configuration) @@ -39,38 +33,30 @@ type ConfigurationWatcher struct { func NewConfigurationWatcher( routinesPool *safe.Pool, pvd provider.Provider, - providersThrottleDuration time.Duration, defaultEntryPoints []string, requiredProvider string, ) *ConfigurationWatcher { - watcher := &ConfigurationWatcher{ - provider: pvd, - configurationChan: make(chan dynamic.Message, 100), - configurationValidatedChan: make(chan dynamic.Message, 100), - providerConfigUpdateMap: make(map[string]chan dynamic.Message), - providersThrottleDuration: providersThrottleDuration, - routinesPool: routinesPool, - defaultEntryPoints: defaultEntryPoints, - requiredProvider: requiredProvider, + return &ConfigurationWatcher{ + providerAggregator: pvd, + allProvidersConfigs: make(chan dynamic.Message, 100), + newConfigs: make(chan dynamic.Configurations), + routinesPool: routinesPool, + defaultEntryPoints: defaultEntryPoints, + requiredProvider: requiredProvider, } - - currentConfigurations := make(dynamic.Configurations) - watcher.currentConfigurations.Set(currentConfigurations) - - return watcher } // Start the configuration watcher. func (c *ConfigurationWatcher) Start() { - c.routinesPool.GoCtx(c.listenProviders) - c.routinesPool.GoCtx(c.listenConfigurations) - c.startProvider() + c.routinesPool.GoCtx(c.receiveConfigurations) + c.routinesPool.GoCtx(c.applyConfigurations) + c.startProviderAggregator() } // Stop the configuration watcher. func (c *ConfigurationWatcher) Stop() { - close(c.configurationChan) - close(c.configurationValidatedChan) + close(c.allProvidersConfigs) + close(c.newConfigs) } // AddListener adds a new listener function used when new configuration is provided. @@ -81,180 +67,159 @@ func (c *ConfigurationWatcher) AddListener(listener func(dynamic.Configuration)) c.configurationListeners = append(c.configurationListeners, listener) } -func (c *ConfigurationWatcher) startProvider() { +func (c *ConfigurationWatcher) startProviderAggregator() { logger := log.WithoutContext() - logger.Infof("Starting provider %T", c.provider) - - currentProvider := c.provider + logger.Infof("Starting provider aggregator %T", c.providerAggregator) safe.Go(func() { - err := currentProvider.Provide(c.configurationChan, c.routinesPool) + err := c.providerAggregator.Provide(c.allProvidersConfigs, c.routinesPool) if err != nil { - logger.Errorf("Error starting provider %T: %s", currentProvider, err) + logger.Errorf("Error starting provider aggregator %T: %s", c.providerAggregator, err) } }) } -// listenProviders receives configuration changes from the providers. -// The configuration message then gets passed along a series of check -// to finally end up in a throttler that sends it to listenConfigurations (through c. configurationValidatedChan). -func (c *ConfigurationWatcher) listenProviders(ctx context.Context) { +// receiveConfigurations receives configuration changes from the providers. +// The configuration message then gets passed along a series of check, notably +// to verify that, for a given provider, the configuration that was just received +// is at least different from the previously received one. +// The full set of configurations is then sent to the throttling goroutine, +// (throttleAndApplyConfigurations) via a RingChannel, which ensures that we can +// constantly send in a non-blocking way to the throttling goroutine the last +// global state we are aware of. +func (c *ConfigurationWatcher) receiveConfigurations(ctx context.Context) { + newConfigurations := make(dynamic.Configurations) + var output chan dynamic.Configurations for { select { case <-ctx.Done(): return - case configMsg, ok := <-c.configurationChan: + // DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs + case output <- newConfigurations.DeepCopy(): + output = nil + + default: + select { + case <-ctx.Done(): + return + case configMsg, ok := <-c.allProvidersConfigs: + if !ok { + return + } + + logger := log.WithoutContext().WithField(log.ProviderName, configMsg.ProviderName) + + if configMsg.Configuration == nil { + logger.Debug("Skipping nil configuration.") + continue + } + + if isEmptyConfiguration(configMsg.Configuration) { + logger.Debug("Skipping empty configuration.") + continue + } + + logConfiguration(logger, configMsg) + + if reflect.DeepEqual(newConfigurations[configMsg.ProviderName], configMsg.Configuration) { + // no change, do nothing + logger.Debug("Skipping unchanged configuration.") + continue + } + + newConfigurations[configMsg.ProviderName] = configMsg.Configuration.DeepCopy() + + output = c.newConfigs + + // DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs + case output <- newConfigurations.DeepCopy(): + output = nil + } + } + } +} + +// applyConfigurations blocks on a RingChannel that receives the new +// set of configurations that is compiled and sent by receiveConfigurations as soon +// as a provider change occurs. If the new set is different from the previous set +// that had been applied, the new set is applied, and we sleep for a while before +// listening on the channel again. +func (c *ConfigurationWatcher) applyConfigurations(ctx context.Context) { + var lastConfigurations dynamic.Configurations + for { + select { + case <-ctx.Done(): + return + case newConfigs, ok := <-c.newConfigs: if !ok { return } - if configMsg.Configuration == nil { - log.WithoutContext().WithField(log.ProviderName, configMsg.ProviderName). - Debug("Received nil configuration from provider, skipping.") - return + // We wait for first configuration of the required provider before applying configurations. + if _, ok := newConfigs[c.requiredProvider]; c.requiredProvider != "" && !ok { + continue } - c.preLoadConfiguration(configMsg) + if reflect.DeepEqual(newConfigs, lastConfigurations) { + continue + } + + conf := mergeConfiguration(newConfigs.DeepCopy(), c.defaultEntryPoints) + conf = applyModel(conf) + + for _, listener := range c.configurationListeners { + listener(conf) + } + + lastConfigurations = newConfigs } } } -func (c *ConfigurationWatcher) listenConfigurations(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case configMsg, ok := <-c.configurationValidatedChan: - if !ok || configMsg.Configuration == nil { - return - } - c.loadMessage(configMsg) - } - } -} - -func (c *ConfigurationWatcher) loadMessage(configMsg dynamic.Message) { - currentConfigurations := c.currentConfigurations.Get().(dynamic.Configurations) - - // Copy configurations to new map so we don't change current if LoadConfig fails - newConfigurations := currentConfigurations.DeepCopy() - newConfigurations[configMsg.ProviderName] = configMsg.Configuration - - c.currentConfigurations.Set(newConfigurations) - - conf := mergeConfiguration(newConfigurations, c.defaultEntryPoints) - conf = applyModel(conf) - - // We wait for first configuration of the require provider before applying configurations. - if _, ok := newConfigurations[c.requiredProvider]; c.requiredProvider == "" || ok { - for _, listener := range c.configurationListeners { - listener(conf) - } - } -} - -func (c *ConfigurationWatcher) preLoadConfiguration(configMsg dynamic.Message) { - logger := log.WithoutContext().WithField(log.ProviderName, configMsg.ProviderName) - if log.GetLevel() == logrus.DebugLevel { - copyConf := configMsg.Configuration.DeepCopy() - if copyConf.TLS != nil { - copyConf.TLS.Certificates = nil - - if copyConf.TLS.Options != nil { - cleanedOptions := make(map[string]tls.Options, len(copyConf.TLS.Options)) - for name, option := range copyConf.TLS.Options { - option.ClientAuth.CAFiles = []tls.FileOrContent{} - cleanedOptions[name] = option - } - - copyConf.TLS.Options = cleanedOptions - } - - for k := range copyConf.TLS.Stores { - st := copyConf.TLS.Stores[k] - st.DefaultCertificate = nil - copyConf.TLS.Stores[k] = st - } - } - - if copyConf.HTTP != nil { - for _, transport := range copyConf.HTTP.ServersTransports { - transport.Certificates = tls.Certificates{} - transport.RootCAs = []tls.FileOrContent{} - } - } - - jsonConf, err := json.Marshal(copyConf) - if err != nil { - logger.Errorf("Could not marshal dynamic configuration: %v", err) - logger.Debugf("Configuration received from provider %s: [struct] %#v", configMsg.ProviderName, copyConf) - } else { - logger.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf)) - } - } - - if isEmptyConfiguration(configMsg.Configuration) { - logger.Infof("Skipping empty Configuration for provider %s", configMsg.ProviderName) +func logConfiguration(logger log.Logger, configMsg dynamic.Message) { + if log.GetLevel() != logrus.DebugLevel { return } - providerConfigUpdateCh, ok := c.providerConfigUpdateMap[configMsg.ProviderName] - if !ok { - providerConfigUpdateCh = make(chan dynamic.Message) - c.providerConfigUpdateMap[configMsg.ProviderName] = providerConfigUpdateCh - c.routinesPool.GoCtx(func(ctxPool context.Context) { - c.throttleProviderConfigReload(ctxPool, c.providersThrottleDuration, c.configurationValidatedChan, providerConfigUpdateCh) - }) + copyConf := configMsg.Configuration.DeepCopy() + if copyConf.TLS != nil { + copyConf.TLS.Certificates = nil + + if copyConf.TLS.Options != nil { + cleanedOptions := make(map[string]tls.Options, len(copyConf.TLS.Options)) + for name, option := range copyConf.TLS.Options { + option.ClientAuth.CAFiles = []tls.FileOrContent{} + cleanedOptions[name] = option + } + + copyConf.TLS.Options = cleanedOptions + } + + for k := range copyConf.TLS.Stores { + st := copyConf.TLS.Stores[k] + st.DefaultCertificate = nil + copyConf.TLS.Stores[k] = st + } } - providerConfigUpdateCh <- configMsg -} - -// throttleProviderConfigReload throttles the configuration reload speed for a single provider. -// It will immediately publish a new configuration and then only publish the next configuration after the throttle duration. -// Note that in the case it receives N new configs in the timeframe of the throttle duration after publishing, -// it will publish the last of the newly received configurations. -func (c *ConfigurationWatcher) throttleProviderConfigReload(ctx context.Context, throttle time.Duration, publish chan<- dynamic.Message, in <-chan dynamic.Message) { - ring := channels.NewRingChannel(1) - defer ring.Close() - - c.routinesPool.GoCtx(func(ctxPool context.Context) { - for { - select { - case <-ctxPool.Done(): - return - case nextConfig := <-ring.Out(): - if config, ok := nextConfig.(dynamic.Message); ok { - publish <- config - time.Sleep(throttle) - } - } + if copyConf.HTTP != nil { + for _, transport := range copyConf.HTTP.ServersTransports { + transport.Certificates = tls.Certificates{} + transport.RootCAs = []tls.FileOrContent{} } - }) + } - var previousConfig dynamic.Message - for { - select { - case <-ctx.Done(): - return - case nextConfig := <-in: - if reflect.DeepEqual(previousConfig, nextConfig) { - logger := log.WithoutContext().WithField(log.ProviderName, nextConfig.ProviderName) - logger.Debug("Skipping same configuration") - continue - } - previousConfig = *nextConfig.DeepCopy() - ring.In() <- *nextConfig.DeepCopy() - } + jsonConf, err := json.Marshal(copyConf) + if err != nil { + logger.Errorf("Could not marshal dynamic configuration: %v", err) + logger.Debugf("Configuration received: [struct] %#v", copyConf) + } else { + logger.Debugf("Configuration received: %s", string(jsonConf)) } } func isEmptyConfiguration(conf *dynamic.Configuration) bool { - if conf == nil { - return true - } - if conf.TCP == nil { conf.TCP = &dynamic.TCPConfiguration{} } diff --git a/pkg/server/configurationwatcher_test.go b/pkg/server/configurationwatcher_test.go index 7b885cd22..11b783e58 100644 --- a/pkg/server/configurationwatcher_test.go +++ b/pkg/server/configurationwatcher_test.go @@ -4,43 +4,62 @@ import ( "context" "fmt" "strconv" + "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/traefik/traefik/v2/pkg/config/dynamic" + "github.com/traefik/traefik/v2/pkg/provider/aggregator" "github.com/traefik/traefik/v2/pkg/safe" th "github.com/traefik/traefik/v2/pkg/testhelpers" "github.com/traefik/traefik/v2/pkg/tls" ) type mockProvider struct { - messages []dynamic.Message - wait time.Duration + messages []dynamic.Message + wait time.Duration + first chan struct{} + throttleDuration time.Duration } func (p *mockProvider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error { - for _, message := range p.messages { - configurationChan <- message + wait := p.wait + if wait == 0 { + wait = 20 * time.Millisecond + } - wait := p.wait - if wait == 0 { - wait = 20 * time.Millisecond - } + if len(p.messages) == 0 { + return fmt.Errorf("no messages available") + } - fmt.Println("wait", wait, time.Now().Nanosecond()) + configurationChan <- p.messages[0] + + if p.first != nil { + <-p.first + } + + for _, message := range p.messages[1:] { time.Sleep(wait) + configurationChan <- message } return nil } +// ThrottleDuration returns the throttle duration. +func (p mockProvider) ThrottleDuration() time.Duration { + return p.throttleDuration +} + func (p *mockProvider) Init() error { - panic("implement me") + return nil } func TestNewConfigurationWatcher(t *testing.T) { routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() + pvd := &mockProvider{ messages: []dynamic.Message{{ ProviderName: "mock", @@ -55,7 +74,7 @@ func TestNewConfigurationWatcher(t *testing.T) { }}, } - watcher := NewConfigurationWatcher(routinesPool, pvd, time.Second, []string{}, "") + watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "") run := make(chan struct{}) @@ -100,11 +119,147 @@ func TestNewConfigurationWatcher(t *testing.T) { <-run } +func TestWaitForRequiredProvider(t *testing.T) { + routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() + + pvdAggregator := &mockProvider{ + wait: 5 * time.Millisecond, + } + + config := &dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("foo")), + th.WithLoadBalancerServices(th.WithService("bar")), + ), + } + + pvdAggregator.messages = append(pvdAggregator.messages, dynamic.Message{ + ProviderName: "mock", + Configuration: config, + }) + + pvdAggregator.messages = append(pvdAggregator.messages, dynamic.Message{ + ProviderName: "required", + Configuration: config, + }) + + pvdAggregator.messages = append(pvdAggregator.messages, dynamic.Message{ + ProviderName: "mock2", + Configuration: config, + }) + + watcher := NewConfigurationWatcher(routinesPool, pvdAggregator, []string{}, "required") + + publishedConfigCount := 0 + watcher.AddListener(func(_ dynamic.Configuration) { + publishedConfigCount++ + }) + + watcher.Start() + defer watcher.Stop() + + // give some time so that the configuration can be processed + time.Sleep(20 * time.Millisecond) + + // after 20 milliseconds we should have 2 configs published + assert.Equal(t, 2, publishedConfigCount, "times configs were published") +} + +func TestIgnoreTransientConfiguration(t *testing.T) { + routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() + + config := &dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("foo")), + th.WithLoadBalancerServices(th.WithService("bar")), + ), + } + + config2 := &dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("baz")), + th.WithLoadBalancerServices(th.WithService("toto")), + ), + } + + watcher := NewConfigurationWatcher(routinesPool, &mockProvider{}, []string{"defaultEP"}, "") + + publishedConfigCount := 0 + var lastConfig dynamic.Configuration + blockConfConsumer := make(chan struct{}) + watcher.AddListener(func(config dynamic.Configuration) { + publishedConfigCount++ + lastConfig = config + <-blockConfConsumer + }) + + watcher.Start() + defer watcher.Stop() + + watcher.allProvidersConfigs <- dynamic.Message{ + ProviderName: "mock", + Configuration: config, + } + + watcher.allProvidersConfigs <- dynamic.Message{ + ProviderName: "mock", + Configuration: config2, + } + + watcher.allProvidersConfigs <- dynamic.Message{ + ProviderName: "mock", + Configuration: config, + } + + close(blockConfConsumer) + + // give some time so that the configuration can be processed + time.Sleep(20 * time.Millisecond) + + // after 20 milliseconds we should have 1 configs published + assert.Equal(t, 1, publishedConfigCount, "times configs were published") + + expected := dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("foo@mock", th.WithEntryPoints("defaultEP"))), + th.WithLoadBalancerServices(th.WithService("bar@mock")), + th.WithMiddlewares(), + ), + TCP: &dynamic.TCPConfiguration{ + Routers: map[string]*dynamic.TCPRouter{}, + Middlewares: map[string]*dynamic.TCPMiddleware{}, + Services: map[string]*dynamic.TCPService{}, + }, + UDP: &dynamic.UDPConfiguration{ + Routers: map[string]*dynamic.UDPRouter{}, + Services: map[string]*dynamic.UDPService{}, + }, + TLS: &dynamic.TLSConfiguration{ + Options: map[string]tls.Options{ + "default": { + ALPNProtocols: []string{ + "h2", + "http/1.1", + "acme-tls/1", + }, + }, + }, + Stores: map[string]tls.Store{}, + }, + } + + assert.Equal(t, expected, lastConfig) +} + func TestListenProvidersThrottleProviderConfigReload(t *testing.T) { routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() pvd := &mockProvider{ - wait: 10 * time.Millisecond, + wait: 10 * time.Millisecond, + throttleDuration: 30 * time.Millisecond, } for i := 0; i < 5; i++ { @@ -119,7 +274,11 @@ func TestListenProvidersThrottleProviderConfigReload(t *testing.T) { }) } - watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}, "") + providerAggregator := aggregator.ProviderAggregator{} + err := providerAggregator.AddProvider(pvd) + assert.Nil(t, err) + + watcher := NewConfigurationWatcher(routinesPool, providerAggregator, []string{}, "") publishedConfigCount := 0 watcher.AddListener(func(_ dynamic.Configuration) { @@ -129,24 +288,28 @@ func TestListenProvidersThrottleProviderConfigReload(t *testing.T) { watcher.Start() defer watcher.Stop() - // give some time so that the configuration can be processed + // Give some time so that the configuration can be processed. time.Sleep(100 * time.Millisecond) - // after 50 milliseconds 5 new configs were published - // with a throttle duration of 30 milliseconds this means, we should have received 3 new configs - assert.Equal(t, 3, publishedConfigCount, "times configs were published") + // To load 5 new configs it would require 150ms (5 configs * 30ms). + // In 100ms, we should only have time to load 3 configs. + assert.LessOrEqual(t, publishedConfigCount, 3, "config was applied too many times") + assert.Greater(t, publishedConfigCount, 0, "config was not applied at least once") } func TestListenProvidersSkipsEmptyConfigs(t *testing.T) { routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() + pvd := &mockProvider{ messages: []dynamic.Message{{ProviderName: "mock"}}, } - watcher := NewConfigurationWatcher(routinesPool, pvd, time.Second, []string{}, "") + watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "") watcher.AddListener(func(_ dynamic.Configuration) { t.Error("An empty configuration was published but it should not") }) + watcher.Start() defer watcher.Stop() @@ -156,6 +319,8 @@ func TestListenProvidersSkipsEmptyConfigs(t *testing.T) { func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) { routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() + message := dynamic.Message{ ProviderName: "mock", Configuration: &dynamic.Configuration{ @@ -165,18 +330,16 @@ func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) { ), }, } + pvd := &mockProvider{ messages: []dynamic.Message{message, message}, } - watcher := NewConfigurationWatcher(routinesPool, pvd, 0, []string{}, "") + watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "") - alreadyCalled := false + var configurationReloads int watcher.AddListener(func(_ dynamic.Configuration) { - if alreadyCalled { - t.Error("Same configuration should not be published multiple times") - } - alreadyCalled = true + configurationReloads++ }) watcher.Start() @@ -184,10 +347,12 @@ func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) { // give some time so that the configuration can be processed time.Sleep(100 * time.Millisecond) + assert.Equal(t, configurationReloads, 1, "Same configuration should not be published multiple times") } func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) { routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() configuration := &dynamic.Configuration{ HTTP: th.BuildConfiguration( @@ -204,7 +369,8 @@ func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) { } pvd := &mockProvider{ - wait: 5 * time.Millisecond, // The last message needs to be received before the second has been fully processed + wait: 5 * time.Millisecond, // The last message needs to be received before the second has been fully processed + throttleDuration: 15 * time.Millisecond, messages: []dynamic.Message{ {ProviderName: "mock", Configuration: configuration}, {ProviderName: "mock", Configuration: transientConfiguration}, @@ -212,7 +378,7 @@ func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) { }, } - watcher := NewConfigurationWatcher(routinesPool, pvd, 15*time.Millisecond, []string{"defaultEP"}, "") + watcher := NewConfigurationWatcher(routinesPool, pvd, []string{"defaultEP"}, "") var lastConfig dynamic.Configuration watcher.AddListener(func(conf dynamic.Configuration) { @@ -257,8 +423,245 @@ func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) { assert.Equal(t, expected, lastConfig) } +func TestListenProvidersIgnoreSameConfig(t *testing.T) { + routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() + + configuration := &dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("foo")), + th.WithLoadBalancerServices(th.WithService("bar")), + ), + } + + transientConfiguration := &dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("bad")), + th.WithLoadBalancerServices(th.WithService("bad")), + ), + } + + // The transient configuration is sent alternatively with the configuration we want to be applied. + // It is intended to show that even if the configurations are different, + // those transient configurations will be ignored if they are sent in a time frame + // lower than the provider throttle duration. + pvd := &mockProvider{ + wait: 1 * time.Microsecond, // Enqueue them fast + throttleDuration: time.Millisecond, + first: make(chan struct{}), + messages: []dynamic.Message{ + {ProviderName: "mock", Configuration: configuration}, + {ProviderName: "mock", Configuration: transientConfiguration}, + {ProviderName: "mock", Configuration: configuration}, + {ProviderName: "mock", Configuration: transientConfiguration}, + {ProviderName: "mock", Configuration: configuration}, + }, + } + + providerAggregator := aggregator.ProviderAggregator{} + err := providerAggregator.AddProvider(pvd) + assert.Nil(t, err) + + watcher := NewConfigurationWatcher(routinesPool, providerAggregator, []string{"defaultEP"}, "") + + var configurationReloads int + var lastConfig dynamic.Configuration + var once sync.Once + watcher.AddListener(func(conf dynamic.Configuration) { + configurationReloads++ + lastConfig = conf + + // Allows next configurations to be sent by the mock provider + // as soon as the first configuration message is applied. + once.Do(func() { + pvd.first <- struct{}{} + // Wait for all configuration messages to pile in + time.Sleep(5 * time.Millisecond) + }) + }) + + watcher.Start() + defer watcher.Stop() + + // Wait long enough + time.Sleep(50 * time.Millisecond) + + expected := dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("foo@mock", th.WithEntryPoints("defaultEP"))), + th.WithLoadBalancerServices(th.WithService("bar@mock")), + th.WithMiddlewares(), + ), + TCP: &dynamic.TCPConfiguration{ + Routers: map[string]*dynamic.TCPRouter{}, + Middlewares: map[string]*dynamic.TCPMiddleware{}, + Services: map[string]*dynamic.TCPService{}, + }, + UDP: &dynamic.UDPConfiguration{ + Routers: map[string]*dynamic.UDPRouter{}, + Services: map[string]*dynamic.UDPService{}, + }, + TLS: &dynamic.TLSConfiguration{ + Options: map[string]tls.Options{ + "default": { + ALPNProtocols: []string{ + "h2", + "http/1.1", + "acme-tls/1", + }, + }, + }, + Stores: map[string]tls.Store{}, + }, + } + + assert.Equal(t, expected, lastConfig) + + assert.Equal(t, 1, configurationReloads) +} + +func TestApplyConfigUnderStress(t *testing.T) { + routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() + + watcher := NewConfigurationWatcher(routinesPool, &mockProvider{}, []string{"defaultEP"}, "") + + routinesPool.GoCtx(func(ctx context.Context) { + i := 0 + for { + select { + case <-ctx.Done(): + return + case watcher.allProvidersConfigs <- dynamic.Message{ProviderName: "mock", Configuration: &dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("foo"+strconv.Itoa(i))), + th.WithLoadBalancerServices(th.WithService("bar")), + ), + }}: + } + i++ + } + }) + + var configurationReloads int + watcher.AddListener(func(conf dynamic.Configuration) { + configurationReloads++ + }) + + watcher.Start() + defer watcher.Stop() + + time.Sleep(100 * time.Millisecond) + + // Ensure that at least two configurations have been applied + // if we simulate being spammed configuration changes by the + // provider(s). + // In theory, checking at least one would be sufficient, but + // checking for two also ensures that we're looping properly, + // and that the whole algo holds, etc. + t.Log(configurationReloads) + assert.GreaterOrEqual(t, configurationReloads, 2) +} + +func TestListenProvidersIgnoreIntermediateConfigs(t *testing.T) { + routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() + + configuration := &dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("foo")), + th.WithLoadBalancerServices(th.WithService("bar")), + ), + } + + transientConfiguration := &dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("bad")), + th.WithLoadBalancerServices(th.WithService("bad")), + ), + } + + transientConfiguration2 := &dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("bad2")), + th.WithLoadBalancerServices(th.WithService("bad2")), + ), + } + + finalConfiguration := &dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("final")), + th.WithLoadBalancerServices(th.WithService("final")), + ), + } + + pvd := &mockProvider{ + wait: 10 * time.Microsecond, // Enqueue them fast + throttleDuration: 10 * time.Millisecond, + messages: []dynamic.Message{ + {ProviderName: "mock", Configuration: configuration}, + {ProviderName: "mock", Configuration: transientConfiguration}, + {ProviderName: "mock", Configuration: transientConfiguration2}, + {ProviderName: "mock", Configuration: finalConfiguration}, + }, + } + + providerAggregator := aggregator.ProviderAggregator{} + err := providerAggregator.AddProvider(pvd) + assert.Nil(t, err) + + watcher := NewConfigurationWatcher(routinesPool, providerAggregator, []string{"defaultEP"}, "") + + var configurationReloads int + var lastConfig dynamic.Configuration + watcher.AddListener(func(conf dynamic.Configuration) { + configurationReloads++ + lastConfig = conf + }) + + watcher.Start() + defer watcher.Stop() + + // Wait long enough + time.Sleep(500 * time.Millisecond) + + expected := dynamic.Configuration{ + HTTP: th.BuildConfiguration( + th.WithRouters(th.WithRouter("final@mock", th.WithEntryPoints("defaultEP"))), + th.WithLoadBalancerServices(th.WithService("final@mock")), + th.WithMiddlewares(), + ), + TCP: &dynamic.TCPConfiguration{ + Routers: map[string]*dynamic.TCPRouter{}, + Middlewares: map[string]*dynamic.TCPMiddleware{}, + Services: map[string]*dynamic.TCPService{}, + }, + UDP: &dynamic.UDPConfiguration{ + Routers: map[string]*dynamic.UDPRouter{}, + Services: map[string]*dynamic.UDPService{}, + }, + TLS: &dynamic.TLSConfiguration{ + Options: map[string]tls.Options{ + "default": { + ALPNProtocols: []string{ + "h2", + "http/1.1", + "acme-tls/1", + }, + }, + }, + Stores: map[string]tls.Store{}, + }, + } + + assert.Equal(t, expected, lastConfig) + + assert.Equal(t, 2, configurationReloads) +} + func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) { routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() configuration := &dynamic.Configuration{ HTTP: th.BuildConfiguration( @@ -274,7 +677,7 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) { }, } - watcher := NewConfigurationWatcher(routinesPool, pvd, 0, []string{"defaultEP"}, "") + watcher := NewConfigurationWatcher(routinesPool, pvd, []string{"defaultEP"}, "") var publishedProviderConfig dynamic.Configuration @@ -294,7 +697,10 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) { th.WithRouter("foo@mock", th.WithEntryPoints("defaultEP")), th.WithRouter("foo@mock2", th.WithEntryPoints("defaultEP")), ), - th.WithLoadBalancerServices(th.WithService("bar@mock"), th.WithService("bar@mock2")), + th.WithLoadBalancerServices( + th.WithService("bar@mock"), + th.WithService("bar@mock2"), + ), th.WithMiddlewares(), ), TCP: &dynamic.TCPConfiguration{ @@ -325,6 +731,7 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) { func TestPublishConfigUpdatedByProvider(t *testing.T) { routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() pvdConfiguration := dynamic.Configuration{ TCP: &dynamic.TCPConfiguration{ @@ -348,7 +755,7 @@ func TestPublishConfigUpdatedByProvider(t *testing.T) { }, } - watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}, "") + watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "") publishedConfigCount := 0 watcher.AddListener(func(configuration dynamic.Configuration) { @@ -369,6 +776,7 @@ func TestPublishConfigUpdatedByProvider(t *testing.T) { func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) { routinesPool := safe.NewPool(context.Background()) + defer routinesPool.Stop() pvd := &mockProvider{ wait: 10 * time.Millisecond, @@ -396,7 +804,7 @@ func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) { }, } - watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}, "") + watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "") publishedConfigCount := 0 watcher.AddListener(func(configuration dynamic.Configuration) {