diff --git a/docs/content/providers/kubernetes-crd.md b/docs/content/providers/kubernetes-crd.md index 637024e8b..653584187 100644 --- a/docs/content/providers/kubernetes-crd.md +++ b/docs/content/providers/kubernetes-crd.md @@ -168,6 +168,27 @@ Value of `kubernetes.io/ingress.class` annotation that identifies Ingress object If the parameter is non-empty, only Ingresses containing an annotation with the same value are processed. Otherwise, Ingresses missing the annotation, having an empty value, or the value `traefik` are processed. +### `throttleDuration` + +_Optional, Default: 0 (no throttling)_ + +```toml tab="File (TOML)" +[providers.kubernetesCRD] + throttleDuration = "10s" + # ... +``` + +```yaml tab="File (YAML)" +providers: + kubernetesCRD: + throttleDuration: "10s" + # ... +``` + +```bash tab="CLI" +--providers.kubernetescrd.throttleDuration="10s" +``` + ## Resource Configuration If you're in a hurry, maybe you'd rather go through the [dynamic](../reference/dynamic-configuration/kubernetes-crd.md) configuration reference. diff --git a/docs/content/providers/kubernetes-ingress.md b/docs/content/providers/kubernetes-ingress.md index ca0978d44..a8819ae98 100644 --- a/docs/content/providers/kubernetes-ingress.md +++ b/docs/content/providers/kubernetes-ingress.md @@ -305,6 +305,27 @@ providers: Published Kubernetes Service to copy status from. +### `throttleDuration` + +_Optional, Default: 0 (no throttling)_ + +```toml tab="File (TOML)" +[providers.kubernetesIngress] + throttleDuration = "10s" + # ... +``` + +```yaml tab="File (YAML)" +providers: + kubernetesIngress: + throttleDuration: "10s" + # ... +``` + +```bash tab="CLI" +--providers.kubernetesingress.throttleDuration="10s" +``` + ## Further If one wants to know more about the various aspects of the Ingress spec that Traefik supports, many examples of Ingresses definitions are located in the tests [data](https://github.com/containous/traefik/tree/v2.0/pkg/provider/kubernetes/ingress/fixtures) of the Traefik repository. diff --git a/docs/content/reference/static-configuration/cli-ref.md b/docs/content/reference/static-configuration/cli-ref.md index 62514adab..c5c813407 100644 --- a/docs/content/reference/static-configuration/cli-ref.md +++ b/docs/content/reference/static-configuration/cli-ref.md @@ -46,7 +46,7 @@ Activate dashboard. (Default: ```true```) Enable additional endpoints for debugging and profiling. (Default: ```false```) `--api.insecure`: -Activate API on an insecure entryPoints named traefik. (Default: ```false```) +Activate API directly on the entryPoint named traefik. (Default: ```false```) `--certificatesresolvers.`: Certificates resolvers configuration. (Default: ```false```) @@ -312,6 +312,9 @@ Kubernetes label selector to use. `--providers.kubernetescrd.namespaces`: Kubernetes namespaces. +`--providers.kubernetescrd.throttleduration`: +Ingress refresh throttle duration (Default: ```0```) + `--providers.kubernetescrd.token`: Kubernetes bearer token (not needed for in-cluster client). @@ -345,6 +348,9 @@ Kubernetes Ingress label selector to use. `--providers.kubernetesingress.namespaces`: Kubernetes namespaces. +`--providers.kubernetesingress.throttleduration`: +Ingress refresh throttle duration (Default: ```0```) + `--providers.kubernetesingress.token`: Kubernetes bearer token (not needed for in-cluster client). @@ -445,7 +451,7 @@ Watch provider. (Default: ```true```) Enable Rest backend with default settings. (Default: ```false```) `--providers.rest.insecure`: -Activate REST Provider on an insecure entryPoints named traefik. (Default: ```false```) +Activate REST Provider directly on the entryPoint named traefik. (Default: ```false```) `--serverstransport.forwardingtimeouts.dialtimeout`: The amount of time to wait until a connection to a backend server can be established. If zero, no timeout exists. (Default: ```30```) diff --git a/docs/content/reference/static-configuration/env-ref.md b/docs/content/reference/static-configuration/env-ref.md index 14da23d49..a223da524 100644 --- a/docs/content/reference/static-configuration/env-ref.md +++ b/docs/content/reference/static-configuration/env-ref.md @@ -46,7 +46,7 @@ Activate dashboard. (Default: ```true```) Enable additional endpoints for debugging and profiling. (Default: ```false```) `TRAEFIK_API_INSECURE`: -Activate API on an insecure entryPoints named traefik. (Default: ```false```) +Activate API directly on the entryPoint named traefik. (Default: ```false```) `TRAEFIK_CERTIFICATESRESOLVERS_`: Certificates resolvers configuration. (Default: ```false```) @@ -312,6 +312,9 @@ Kubernetes label selector to use. `TRAEFIK_PROVIDERS_KUBERNETESCRD_NAMESPACES`: Kubernetes namespaces. +`TRAEFIK_PROVIDERS_KUBERNETESCRD_THROTTLEDURATION`: +Ingress refresh throttle duration (Default: ```0```) + `TRAEFIK_PROVIDERS_KUBERNETESCRD_TOKEN`: Kubernetes bearer token (not needed for in-cluster client). @@ -345,6 +348,9 @@ Kubernetes Ingress label selector to use. `TRAEFIK_PROVIDERS_KUBERNETESINGRESS_NAMESPACES`: Kubernetes namespaces. +`TRAEFIK_PROVIDERS_KUBERNETESINGRESS_THROTTLEDURATION`: +Ingress refresh throttle duration (Default: ```0```) + `TRAEFIK_PROVIDERS_KUBERNETESINGRESS_TOKEN`: Kubernetes bearer token (not needed for in-cluster client). @@ -445,7 +451,7 @@ Watch provider. (Default: ```true```) Enable Rest backend with default settings. (Default: ```false```) `TRAEFIK_PROVIDERS_REST_INSECURE`: -Activate REST Provider on an insecure entryPoints named traefik. (Default: ```false```) +Activate REST Provider directly on the entryPoint named traefik. (Default: ```false```) `TRAEFIK_SERVERSTRANSPORT_FORWARDINGTIMEOUTS_DIALTIMEOUT`: The amount of time to wait until a connection to a backend server can be established. If zero, no timeout exists. (Default: ```30```) diff --git a/docs/content/reference/static-configuration/file.toml b/docs/content/reference/static-configuration/file.toml index 4f4542f63..3c314572c 100644 --- a/docs/content/reference/static-configuration/file.toml +++ b/docs/content/reference/static-configuration/file.toml @@ -83,6 +83,7 @@ namespaces = ["foobar", "foobar"] labelSelector = "foobar" ingressClass = "foobar" + throttleDuration = "10s" [providers.kubernetesIngress.ingressEndpoint] ip = "foobar" hostname = "foobar" @@ -95,6 +96,7 @@ namespaces = ["foobar", "foobar"] labelSelector = "foobar" ingressClass = "foobar" + throttleDuration = "10s" [providers.rest] insecure = true [providers.rancher] diff --git a/docs/content/reference/static-configuration/file.yaml b/docs/content/reference/static-configuration/file.yaml index 6080d9037..e55eac104 100644 --- a/docs/content/reference/static-configuration/file.yaml +++ b/docs/content/reference/static-configuration/file.yaml @@ -88,6 +88,7 @@ providers: - foobar labelSelector: foobar ingressClass: foobar + throttleDuration: 10s ingressEndpoint: ip: foobar hostname: foobar @@ -102,6 +103,7 @@ providers: - foobar labelSelector: foobar ingressClass: foobar + throttleDuration: 10s rest: insecure: true rancher: diff --git a/pkg/provider/kubernetes/crd/kubernetes.go b/pkg/provider/kubernetes/crd/kubernetes.go index c35a82ead..4371b5e34 100644 --- a/pkg/provider/kubernetes/crd/kubernetes.go +++ b/pkg/provider/kubernetes/crd/kubernetes.go @@ -20,6 +20,7 @@ import ( "github.com/containous/traefik/v2/pkg/provider/kubernetes/crd/traefik/v1alpha1" "github.com/containous/traefik/v2/pkg/safe" "github.com/containous/traefik/v2/pkg/tls" + "github.com/containous/traefik/v2/pkg/types" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" ) @@ -31,13 +32,14 @@ const ( // Provider holds configurations of the provider. type Provider struct { - Endpoint string `description:"Kubernetes server endpoint (required for external cluster client)." json:"endpoint,omitempty" toml:"endpoint,omitempty" yaml:"endpoint,omitempty"` - Token string `description:"Kubernetes bearer token (not needed for in-cluster client)." json:"token,omitempty" toml:"token,omitempty" yaml:"token,omitempty"` - CertAuthFilePath string `description:"Kubernetes certificate authority file path (not needed for in-cluster client)." json:"certAuthFilePath,omitempty" toml:"certAuthFilePath,omitempty" yaml:"certAuthFilePath,omitempty"` - DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers." json:"disablePassHostHeaders,omitempty" toml:"disablePassHostHeaders,omitempty" yaml:"disablePassHostHeaders,omitempty" export:"true"` - Namespaces []string `description:"Kubernetes namespaces." json:"namespaces,omitempty" toml:"namespaces,omitempty" yaml:"namespaces,omitempty" export:"true"` - LabelSelector string `description:"Kubernetes label selector to use." json:"labelSelector,omitempty" toml:"labelSelector,omitempty" yaml:"labelSelector,omitempty" export:"true"` - IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for." json:"ingressClass,omitempty" toml:"ingressClass,omitempty" yaml:"ingressClass,omitempty" export:"true"` + Endpoint string `description:"Kubernetes server endpoint (required for external cluster client)." json:"endpoint,omitempty" toml:"endpoint,omitempty" yaml:"endpoint,omitempty"` + Token string `description:"Kubernetes bearer token (not needed for in-cluster client)." json:"token,omitempty" toml:"token,omitempty" yaml:"token,omitempty"` + CertAuthFilePath string `description:"Kubernetes certificate authority file path (not needed for in-cluster client)." json:"certAuthFilePath,omitempty" toml:"certAuthFilePath,omitempty" yaml:"certAuthFilePath,omitempty"` + DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers." json:"disablePassHostHeaders,omitempty" toml:"disablePassHostHeaders,omitempty" yaml:"disablePassHostHeaders,omitempty" export:"true"` + Namespaces []string `description:"Kubernetes namespaces." json:"namespaces,omitempty" toml:"namespaces,omitempty" yaml:"namespaces,omitempty" export:"true"` + LabelSelector string `description:"Kubernetes label selector to use." json:"labelSelector,omitempty" toml:"labelSelector,omitempty" yaml:"labelSelector,omitempty" export:"true"` + IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for." json:"ingressClass,omitempty" toml:"ingressClass,omitempty" yaml:"ingressClass,omitempty" export:"true"` + ThrottleDuration types.Duration `description:"Ingress refresh throttle duration" json:"throttleDuration,omitempty" toml:"throttleDuration,omitempty" yaml:"throttleDuration,omitempty"` lastConfiguration safe.Safe } @@ -95,6 +97,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. stopWatch := make(chan struct{}, 1) defer close(stopWatch) eventsChan, err := k8sClient.WatchAll(p.Namespaces, stopWatch) + if err != nil { logger.Errorf("Error watching kubernetes events: %v", err) timer := time.NewTimer(1 * time.Second) @@ -105,11 +108,20 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. return nil } } + + throttleDuration := time.Duration(p.ThrottleDuration) + eventsChanToRead := throttleEvents(ctxLog, throttleDuration, stop, eventsChan) + for { select { case <-stop: return nil - case event := <-eventsChan: + case event := <-eventsChanToRead: + // Note that event is the *first* event that came in during this + // throttling interval -- if we're hitting our throttle, we may have + // dropped events. This is fine, because we don't treat different + // event types differently. But if we do in the future, we'll need to + // track more information about the dropped events. conf := p.loadConfigurationFromCRD(ctxLog, k8sClient) if reflect.DeepEqual(p.lastConfiguration.Get(), conf) { @@ -121,6 +133,11 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. Configuration: conf, } } + + // If we're throttling, we sleep here for the throttle duration to + // enforce that we don't refresh faster than our throttle. time.Sleep + // returns immediately if p.ThrottleDuration is 0 (no throttle). + time.Sleep(throttleDuration) } } } @@ -587,3 +604,36 @@ func getCABlocks(secret *corev1.Secret, namespace, secretName string) (string, e return cert, nil } + +func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop chan bool, eventsChan <-chan interface{}) chan interface{} { + if throttleDuration == 0 { + return nil + } + // Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling) + eventsChanBuffered := make(chan interface{}, 1) + + // Run a goroutine that reads events from eventChan and does a + // non-blocking write to pendingEvent. This guarantees that writing to + // eventChan will never block, and that pendingEvent will have + // something in it if there's been an event since we read from that channel. + go func() { + for { + select { + case <-stop: + return + case nextEvent := <-eventsChan: + select { + case eventsChanBuffered <- nextEvent: + default: + // We already have an event in eventsChanBuffered, so we'll + // do a refresh as soon as our throttle allows us to. It's fine + // to drop the event and keep whatever's in the buffer -- we + // don't do different things for different events + log.FromContext(ctx).Debugf("Dropping event kind %T due to throttling", nextEvent) + } + } + } + }() + + return eventsChanBuffered +} diff --git a/pkg/provider/kubernetes/ingress/kubernetes.go b/pkg/provider/kubernetes/ingress/kubernetes.go index 8f17bf174..8e265a067 100644 --- a/pkg/provider/kubernetes/ingress/kubernetes.go +++ b/pkg/provider/kubernetes/ingress/kubernetes.go @@ -18,6 +18,7 @@ import ( "github.com/containous/traefik/v2/pkg/log" "github.com/containous/traefik/v2/pkg/safe" "github.com/containous/traefik/v2/pkg/tls" + "github.com/containous/traefik/v2/pkg/types" corev1 "k8s.io/api/core/v1" "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/labels" @@ -39,6 +40,7 @@ type Provider struct { LabelSelector string `description:"Kubernetes Ingress label selector to use." json:"labelSelector,omitempty" toml:"labelSelector,omitempty" yaml:"labelSelector,omitempty" export:"true"` IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for." json:"ingressClass,omitempty" toml:"ingressClass,omitempty" yaml:"ingressClass,omitempty" export:"true"` IngressEndpoint *EndpointIngress `description:"Kubernetes Ingress Endpoint." json:"ingressEndpoint,omitempty" toml:"ingressEndpoint,omitempty" yaml:"ingressEndpoint,omitempty"` + ThrottleDuration types.Duration `description:"Ingress refresh throttle duration" json:"throttleDuration,omitempty" toml:"throttleDuration,omitempty" yaml:"throttleDuration,omitempty"` lastConfiguration safe.Safe } @@ -118,11 +120,19 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. } } + throttleDuration := time.Duration(p.ThrottleDuration) + eventsChanToRead := throttleEvents(ctxLog, throttleDuration, stop, eventsChan) + for { select { case <-stop: return nil - case event := <-eventsChan: + case event := <-eventsChanToRead: + // Note that event is the *first* event that came in during this + // throttling interval -- if we're hitting our throttle, we may have + // dropped events. This is fine, because we don't treat different + // event types differently. But if we do in the future, we'll need to + // track more information about the dropped events. conf := p.loadConfigurationFromIngresses(ctxLog, k8sClient) if reflect.DeepEqual(p.lastConfiguration.Get(), conf) { @@ -134,6 +144,11 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe. Configuration: conf, } } + + // If we're throttling, we sleep here for the throttle duration to + // enforce that we don't refresh faster than our throttle. time.Sleep + // returns immediately if p.ThrottleDuration is 0 (no throttle). + time.Sleep(throttleDuration) } } } @@ -478,3 +493,36 @@ func (p *Provider) updateIngressStatus(i *v1beta1.Ingress, k8sClient Client) err return k8sClient.UpdateIngressStatus(i.Namespace, i.Name, service.Status.LoadBalancer.Ingress[0].IP, service.Status.LoadBalancer.Ingress[0].Hostname) } + +func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop chan bool, eventsChan <-chan interface{}) chan interface{} { + if throttleDuration == 0 { + return nil + } + // Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling) + eventsChanBuffered := make(chan interface{}, 1) + + // Run a goroutine that reads events from eventChan and does a + // non-blocking write to pendingEvent. This guarantees that writing to + // eventChan will never block, and that pendingEvent will have + // something in it if there's been an event since we read from that channel. + go func() { + for { + select { + case <-stop: + return + case nextEvent := <-eventsChan: + select { + case eventsChanBuffered <- nextEvent: + default: + // We already have an event in eventsChanBuffered, so we'll + // do a refresh as soon as our throttle allows us to. It's fine + // to drop the event and keep whatever's in the buffer -- we + // don't do different things for different events + log.FromContext(ctx).Debugf("Dropping event kind %T due to throttling", nextEvent) + } + } + } + }() + + return eventsChanBuffered +}