From 9f6484a328127189009d83dd2758115ccc4529c1 Mon Sep 17 00:00:00 2001 From: Parham Negahdar Date: Tue, 12 Jul 2016 01:25:01 -0400 Subject: [PATCH] Fixes #363: Allow for kubernetes label selectors --- configuration.go | 1 + docs/toml.md | 4 ++ provider/k8s/client.go | 88 +++++++++++++++++++++++-------------- provider/kubernetes.go | 8 ++-- provider/kubernetes_test.go | 6 +-- traefik.sample.toml | 9 ++-- 6 files changed, 74 insertions(+), 42 deletions(-) diff --git a/configuration.go b/configuration.go index eab6315ac..cb4b17f9a 100644 --- a/configuration.go +++ b/configuration.go @@ -273,6 +273,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration { var defaultKubernetes provider.Kubernetes defaultKubernetes.Watch = true defaultKubernetes.Endpoint = "" + defaultKubernetes.LabelSelector = "" defaultKubernetes.Constraints = []types.Constraint{} defaultConfiguration := GlobalConfiguration{ diff --git a/docs/toml.md b/docs/toml.md index b3106aacf..2f12347b4 100644 --- a/docs/toml.md +++ b/docs/toml.md @@ -665,6 +665,10 @@ Træfɪk can be configured to use Kubernetes Ingress as a backend configuration: # # endpoint = "http://localhost:8080" # namespaces = ["default","production"] +# +# See: http://kubernetes.io/docs/user-guide/labels/#list-and-watch-filtering +# labelselector = "A and not B" +# ``` Annotations can be used on containers to override default behaviour for the whole Ingress resource: diff --git a/provider/k8s/client.go b/provider/k8s/client.go index 0b2195615..b120c8bb4 100644 --- a/provider/k8s/client.go +++ b/provider/k8s/client.go @@ -5,6 +5,7 @@ import ( "crypto/x509" "encoding/json" "fmt" + log "github.com/Sirupsen/logrus" "github.com/parnurzeal/gorequest" "net/http" "net/url" @@ -21,10 +22,10 @@ const ( // Client is a client for the Kubernetes master. type Client interface { - GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) + GetIngresses(labelSelector string, predicate func(Ingress) bool) ([]Ingress, error) GetService(name, namespace string) (Service, error) GetEndpoints(name, namespace string) (Endpoints, error) - WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) + WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) } type clientImpl struct { @@ -50,11 +51,26 @@ func NewClient(baseURL string, caCert []byte, token string) (Client, error) { }, nil } -// GetIngresses returns all ingresses in the cluster -func (c *clientImpl) GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) { - getURL := c.endpointURL + extentionsEndpoint + defaultIngress +func makeQueryString(baseParams map[string]string, labelSelector string) (string, error) { + if labelSelector != "" { + baseParams["labelSelector"] = labelSelector + } + queryData, err := json.Marshal(baseParams) + if err != nil { + return "", err + } + return string(queryData), nil +} - body, err := c.do(c.request(getURL)) +// GetIngresses returns all ingresses in the cluster +func (c *clientImpl) GetIngresses(labelSelector string, predicate func(Ingress) bool) ([]Ingress, error) { + getURL := c.endpointURL + extentionsEndpoint + defaultIngress + queryParams := map[string]string{} + queryData, err := makeQueryString(queryParams, labelSelector) + if err != nil { + return nil, fmt.Errorf("Had problems constructing query string %s : %v", queryParams, err) + } + body, err := c.do(c.request(getURL, queryData)) if err != nil { return nil, fmt.Errorf("failed to create ingresses request: GET %q : %v", getURL, err) } @@ -73,16 +89,16 @@ func (c *clientImpl) GetIngresses(predicate func(Ingress) bool) ([]Ingress, erro } // WatchIngresses returns all ingresses in the cluster -func (c *clientImpl) WatchIngresses(stopCh <-chan bool) (chan interface{}, chan error, error) { +func (c *clientImpl) WatchIngresses(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) { getURL := c.endpointURL + extentionsEndpoint + defaultIngress - return c.watch(getURL, stopCh) + return c.watch(getURL, labelSelector, stopCh) } // GetService returns the named service from the named namespace func (c *clientImpl) GetService(name, namespace string) (Service, error) { getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/services/" + name - body, err := c.do(c.request(getURL)) + body, err := c.do(c.request(getURL, "")) if err != nil { return Service{}, fmt.Errorf("failed to create services request: GET %q : %v", getURL, err) } @@ -95,9 +111,9 @@ func (c *clientImpl) GetService(name, namespace string) (Service, error) { } // WatchServices returns all services in the cluster -func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan error, error) { +func (c *clientImpl) WatchServices(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) { getURL := c.endpointURL + APIEndpoint + "/services" - return c.watch(getURL, stopCh) + return c.watch(getURL, labelSelector, stopCh) } // GetEndpoints returns the named Endpoints @@ -105,7 +121,7 @@ func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan e func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) { getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/endpoints/" + name - body, err := c.do(c.request(getURL)) + body, err := c.do(c.request(getURL, "")) if err != nil { return Endpoints{}, fmt.Errorf("failed to create endpoints request: GET %q : %v", getURL, err) } @@ -118,28 +134,28 @@ func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) { } // WatchEndpoints returns endpoints in the cluster -func (c *clientImpl) WatchEndpoints(stopCh <-chan bool) (chan interface{}, chan error, error) { +func (c *clientImpl) WatchEndpoints(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) { getURL := c.endpointURL + APIEndpoint + "/endpoints" - return c.watch(getURL, stopCh) + return c.watch(getURL, labelSelector, stopCh) } // WatchAll returns events in the cluster -func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) { +func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) { watchCh := make(chan interface{}, 10) errCh := make(chan error, 10) stopIngresses := make(chan bool) - chanIngresses, chanIngressesErr, err := c.WatchIngresses(stopIngresses) + chanIngresses, chanIngressesErr, err := c.WatchIngresses(labelSelector, stopIngresses) if err != nil { return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) } stopServices := make(chan bool) - chanServices, chanServicesErr, err := c.WatchServices(stopServices) + chanServices, chanServicesErr, err := c.WatchServices(labelSelector, stopServices) if err != nil { return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) } stopEndpoints := make(chan bool) - chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(stopEndpoints) + chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(labelSelector, stopEndpoints) if err != nil { return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) } @@ -188,22 +204,26 @@ func (c *clientImpl) do(request *gorequest.SuperAgent) ([]byte, error) { return body, nil } -func (c *clientImpl) request(url string) *gorequest.SuperAgent { +func (c *clientImpl) request(reqURL string, queryContent interface{}) *gorequest.SuperAgent { // Make request to Kubernetes API - request := gorequest.New().Get(url) + parsedURL, parseErr := url.Parse(reqURL) + if parseErr != nil { + log.Errorf("Had issues parsing url %s. Trying anyway.", reqURL) + } + request := gorequest.New().Get(reqURL) request.Transport.DisableKeepAlives = true - if strings.HasPrefix(url, "http://") { - return request - } - - if len(c.token) > 0 { - request.Header["Authorization"] = "Bearer " + c.token + if parsedURL.Scheme == "https" { pool := x509.NewCertPool() pool.AppendCertsFromPEM(c.caCert) c.tls = &tls.Config{RootCAs: pool} + request.TLSClientConfig(c.tls) } - return request.TLSClientConfig(c.tls) + if len(c.token) > 0 { + request.Header["Authorization"] = "Bearer " + c.token + } + request.Query(queryContent) + return request } // GenericObject generic object @@ -212,12 +232,12 @@ type GenericObject struct { ListMeta `json:"metadata,omitempty"` } -func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, chan error, error) { +func (c *clientImpl) watch(url string, labelSelector string, stopCh <-chan bool) (chan interface{}, chan error, error) { watchCh := make(chan interface{}, 10) errCh := make(chan error, 10) // get version - body, err := c.do(c.request(url)) + body, err := c.do(c.request(url, "")) if err != nil { return watchCh, errCh, fmt.Errorf("failed to do version request: GET %q : %v", url, err) } @@ -227,10 +247,12 @@ func (c *clientImpl) watch(url string, stopCh <-chan bool) (chan interface{}, ch return watchCh, errCh, fmt.Errorf("failed to decode version %v", err) } resourceVersion := generic.ResourceVersion - - url = url + "?watch&resourceVersion=" + resourceVersion - // Make request to Kubernetes API - request := c.request(url) + queryParams := map[string]string{"watch": "", "resourceVersion": resourceVersion} + queryData, err := makeQueryString(queryParams, labelSelector) + if err != nil { + return watchCh, errCh, fmt.Errorf("Unable to construct query args") + } + request := c.request(url, queryData) req, err := request.MakeRequest() if err != nil { return watchCh, errCh, fmt.Errorf("failed to make watch request: GET %q : %v", url, err) diff --git a/provider/kubernetes.go b/provider/kubernetes.go index 078939a81..0535b9570 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -20,7 +20,7 @@ import ( const ( serviceAccountToken = "/var/run/secrets/kubernetes.io/serviceaccount/token" serviceAccountCACert = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" - defaultKubeEndpoint = "http://127.0.0.1:8080" + defaultKubeEndpoint = "http://127.0.0.1:8080" ) // Namespaces holds kubernetes namespaces @@ -55,6 +55,7 @@ type Kubernetes struct { Endpoint string `description:"Kubernetes server endpoint"` DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers"` Namespaces Namespaces `description:"Kubernetes namespaces"` + LabelSelector string `description:"Kubernetes api label selector to use"` lastConfiguration safe.Safe } @@ -103,7 +104,8 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage for { stopWatch := make(chan bool, 5) defer close(stopWatch) - eventsChan, errEventsChan, err := k8sClient.WatchAll(stopWatch) + log.Debugf("Using lable selector: %s", provider.LabelSelector) + eventsChan, errEventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch) if err != nil { log.Errorf("Error watching kubernetes events: %v", err) timer := time.NewTimer(1 * time.Second) @@ -174,7 +176,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage } func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configuration, error) { - ingresses, err := k8sClient.GetIngresses(func(ingress k8s.Ingress) bool { + ingresses, err := k8sClient.GetIngresses(provider.LabelSelector, func(ingress k8s.Ingress) bool { if len(provider.Namespaces) == 0 { return true } diff --git a/provider/kubernetes_test.go b/provider/kubernetes_test.go index 25e48b8d7..0f0ee5ca1 100644 --- a/provider/kubernetes_test.go +++ b/provider/kubernetes_test.go @@ -1250,7 +1250,7 @@ type clientMock struct { watchChan chan interface{} } -func (c clientMock) GetIngresses(predicate func(k8s.Ingress) bool) ([]k8s.Ingress, error) { +func (c clientMock) GetIngresses(labelString string, predicate func(k8s.Ingress) bool) ([]k8s.Ingress, error) { var ingresses []k8s.Ingress for _, ingress := range c.ingresses { if predicate(ingress) { @@ -1259,7 +1259,7 @@ func (c clientMock) GetIngresses(predicate func(k8s.Ingress) bool) ([]k8s.Ingres } return ingresses, nil } -func (c clientMock) WatchIngresses(predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) { +func (c clientMock) WatchIngresses(labelString string, predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) { return c.watchChan, make(chan error), nil } func (c clientMock) GetService(name, namespace string) (k8s.Service, error) { @@ -1280,6 +1280,6 @@ func (c clientMock) GetEndpoints(name, namespace string) (k8s.Endpoints, error) return k8s.Endpoints{}, nil } -func (c clientMock) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) { +func (c clientMock) WatchAll(labelString string, stopCh <-chan bool) (chan interface{}, chan error, error) { return c.watchChan, make(chan error), nil } diff --git a/traefik.sample.toml b/traefik.sample.toml index 88f610cad..38803498c 100644 --- a/traefik.sample.toml +++ b/traefik.sample.toml @@ -75,7 +75,7 @@ # storageFile = "acme.json" # Entrypoint to proxy acme challenge to. -# WARNING, must point to an entrypoint on port 443 +# WARNING, must point to an entrypoint on port 443 # # Required # @@ -340,11 +340,14 @@ # and KUBERNETES_SERVICE_PORT_HTTPS as endpoint # Secure token will be found in /var/run/secrets/kubernetes.io/serviceaccount/token # and SSL CA cert in /var/run/secrets/kubernetes.io/serviceaccount/ca.crt -# +# # Optional # # endpoint = "http://localhost:8080" # namespaces = ["default"] +# +# See: http://kubernetes.io/docs/user-guide/labels/#list-and-watch-filtering +# labelselector = "A and not B" ################################################################ # Consul KV configuration backend @@ -545,4 +548,4 @@ # [frontends.frontend3] # entrypoints = ["http", "https"] # overrides defaultEntryPoints # backend = "backend2" -# rule = "Path: /test, /other" \ No newline at end of file +# rule = "Path: /test, /other"