From e948a013cdf866caa9d0b14366d74fd575a6fffe Mon Sep 17 00:00:00 2001 From: Ed Robinson Date: Fri, 20 May 2016 17:34:57 +0100 Subject: [PATCH 1/3] Build backend config using the K8S endpoint resource. * Potentialy saves a network hop * Ability to configure LB algothim (given some work to expose an anotation etc...) * K8s config Watch is triggered far less often --- provider/k8s/client.go | 55 +++++++++++------------- provider/k8s/endpoints.go | 84 +++++++++++++++++++++++++++++++++++++ provider/kubernetes.go | 38 +++++++++++++++-- provider/kubernetes_test.go | 75 +++++++++++++++++++++++++++++---- 4 files changed, 210 insertions(+), 42 deletions(-) create mode 100644 provider/k8s/endpoints.go diff --git a/provider/k8s/client.go b/provider/k8s/client.go index 957d66a9b..930c9835e 100644 --- a/provider/k8s/client.go +++ b/provider/k8s/client.go @@ -22,6 +22,7 @@ const ( type Client interface { GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) GetServices(predicate func(Service) bool) ([]Service, error) + GetEndpoints(name, namespace string) (Endpoints, error) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) } @@ -104,21 +105,26 @@ func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan e return c.watch(getURL, stopCh) } -// WatchEvents returns events in the cluster -func (c *clientImpl) WatchEvents(stopCh <-chan bool) (chan interface{}, chan error, error) { - getURL := c.endpointURL + APIEndpoint + "/events" - return c.watch(getURL, stopCh) +// GetEndpoints returns the named Endpoints +// Endpoints have the same name as the coresponding service +func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) { + getURL := c.endpointURL + APIEndpoint + "/namespaces/" + namespace + "/endpoints/" + name + + body, err := c.do(c.request(getURL)) + if err != nil { + return Endpoints{}, fmt.Errorf("failed to create endpoints request: GET %q : %v", getURL, err) + } + + var endpoints Endpoints + if err := json.Unmarshal(body, &endpoints); err != nil { + return Endpoints{}, fmt.Errorf("failed to decode endpoints resources: %v", err) + } + return endpoints, nil } -// WatchPods returns pods in the cluster -func (c *clientImpl) WatchPods(stopCh <-chan bool) (chan interface{}, chan error, error) { - getURL := c.endpointURL + APIEndpoint + "/pods" - return c.watch(getURL, stopCh) -} - -// WatchReplicationControllers returns ReplicationControllers in the cluster -func (c *clientImpl) WatchReplicationControllers(stopCh <-chan bool) (chan interface{}, chan error, error) { - getURL := c.endpointURL + APIEndpoint + "/replicationcontrollers" +// WatchEndpoints returns endpoints in the cluster +func (c *clientImpl) WatchEndpoints(stopCh <-chan bool) (chan interface{}, chan error, error) { + getURL := c.endpointURL + APIEndpoint + "/endpoints" return c.watch(getURL, stopCh) } @@ -137,13 +143,8 @@ func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, if err != nil { return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) } - stopPods := make(chan bool) - chanPods, chanPodsErr, err := c.WatchPods(stopPods) - if err != nil { - return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) - } - stopReplicationControllers := make(chan bool) - chanReplicationControllers, chanReplicationControllersErr, err := c.WatchReplicationControllers(stopReplicationControllers) + stopEndpoints := make(chan bool) + chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(stopEndpoints) if err != nil { return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) } @@ -152,32 +153,26 @@ func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, defer close(errCh) defer close(stopIngresses) defer close(stopServices) - defer close(stopPods) - defer close(stopReplicationControllers) + defer close(stopEndpoints) for { select { case <-stopCh: stopIngresses <- true stopServices <- true - stopPods <- true - stopReplicationControllers <- true + stopEndpoints <- true return case err := <-chanIngressesErr: errCh <- err case err := <-chanServicesErr: errCh <- err - case err := <-chanPodsErr: - errCh <- err - case err := <-chanReplicationControllersErr: + case err := <-chanEndpointsErr: errCh <- err case event := <-chanIngresses: watchCh <- event case event := <-chanServices: watchCh <- event - case event := <-chanPods: - watchCh <- event - case event := <-chanReplicationControllers: + case event := <-chanEndpoints: watchCh <- event } } diff --git a/provider/k8s/endpoints.go b/provider/k8s/endpoints.go new file mode 100644 index 000000000..123ffe36c --- /dev/null +++ b/provider/k8s/endpoints.go @@ -0,0 +1,84 @@ +package k8s + +// Endpoints is a collection of endpoints that implement the actual service. Example: +// Name: "mysvc", +// Subsets: [ +// { +// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}], +// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}] +// }, +// { +// Addresses: [{"ip": "10.10.3.3"}], +// Ports: [{"name": "a", "port": 93}, {"name": "b", "port": 76}] +// }, +// ] +type Endpoints struct { + TypeMeta `json:",inline"` + ObjectMeta `json:"metadata,omitempty"` + + // The set of all endpoints is the union of all subsets. + Subsets []EndpointSubset +} + +// EndpointSubset is a group of addresses with a common set of ports. The +// expanded set of endpoints is the Cartesian product of Addresses x Ports. +// For example, given: +// { +// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}], +// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}] +// } +// The resulting set of endpoints can be viewed as: +// a: [ 10.10.1.1:8675, 10.10.2.2:8675 ], +// b: [ 10.10.1.1:309, 10.10.2.2:309 ] +type EndpointSubset struct { + Addresses []EndpointAddress + NotReadyAddresses []EndpointAddress + Ports []EndpointPort +} + +// EndpointAddress is a tuple that describes single IP address. +type EndpointAddress struct { + // The IP of this endpoint. + // IPv6 is also accepted but not fully supported on all platforms. Also, certain + // kubernetes components, like kube-proxy, are not IPv6 ready. + // TODO: This should allow hostname or IP, see #4447. + IP string + // Optional: Hostname of this endpoint + // Meant to be used by DNS servers etc. + Hostname string `json:"hostname,omitempty"` + // Optional: The kubernetes object related to the entry point. + TargetRef *ObjectReference +} + +// EndpointPort is a tuple that describes a single port. +type EndpointPort struct { + // The name of this port (corresponds to ServicePort.Name). Optional + // if only one port is defined. Must be a DNS_LABEL. + Name string + + // The port number. + Port int32 + + // The IP protocol for this port. + Protocol Protocol +} + +// ObjectReference contains enough information to let you inspect or modify the referred object. +type ObjectReference struct { + Kind string `json:"kind,omitempty"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + UID UID `json:"uid,omitempty"` + APIVersion string `json:"apiVersion,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty"` + + // Optional. If referring to a piece of an object instead of an entire object, this string + // should contain information to identify the sub-object. For example, if the object + // reference is to a container within a pod, this would take on a value like: + // "spec.containers{name}" (where "name" refers to the name of the container that triggered + // the event) or if no container name is specified "spec.containers[2]" (container with + // index 2 in this pod). This syntax is chosen only to have some well-defined way of + // referencing a part of an object. + // TODO: this design is not final and this field is subject to change in the future. + FieldPath string `json:"fieldPath,omitempty"` +} diff --git a/provider/kubernetes.go b/provider/kubernetes.go index 7362898b2..11556f49c 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -209,9 +209,27 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur if port.Port == 443 { protocol = "https" } - templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{ - URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(port.Port), - Weight: 1, + endpoints, err := k8sClient.GetEndpoints(service.ObjectMeta.Name, service.ObjectMeta.Namespace) + if err != nil { + log.Errorf("Error retrieving endpoints: %v", err) + continue + } + if len(endpoints.Subsets) == 0 { + log.Warnf("Endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name) + templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{ + URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(port.Port), + Weight: 1, + } + } else { + for _, subset := range endpoints.Subsets { + for _, address := range subset.Addresses { + url := protocol + "://" + address.IP + ":" + strconv.Itoa(endpointPortNumber(port, subset.Ports)) + templateObjects.Backends[r.Host+pa.Path].Servers[url] = types.Server{ + URL: url, + Weight: 1, + } + } + } } break } @@ -223,6 +241,20 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur return &templateObjects, nil } +func endpointPortNumber(servicePort k8s.ServicePort, endpointPorts []k8s.EndpointPort) int { + if len(endpointPorts) > 0 { + //name is optional if there is only one port + port := endpointPorts[0] + for _, endpointPort := range endpointPorts { + if servicePort.Name == endpointPort.Name { + port = endpointPort + } + } + return int(port.Port) + } + return servicePort.Port +} + func equalPorts(servicePort k8s.ServicePort, ingressPort k8s.IntOrString) bool { if servicePort.Port == ingressPort.IntValue() { return true diff --git a/provider/kubernetes_test.go b/provider/kubernetes_test.go index b74b0536d..9f6ddfb32 100644 --- a/provider/kubernetes_test.go +++ b/provider/kubernetes_test.go @@ -10,6 +10,9 @@ import ( func TestLoadIngresses(t *testing.T) { ingresses := []k8s.Ingress{{ + ObjectMeta: k8s.ObjectMeta{ + Namespace: "testing", + }, Spec: k8s.IngressSpec{ Rules: []k8s.IngressRule{ { @@ -55,23 +58,25 @@ func TestLoadIngresses(t *testing.T) { services := []k8s.Service{ { ObjectMeta: k8s.ObjectMeta{ - Name: "service1", - UID: "1", + Name: "service1", + UID: "1", + Namespace: "testing", }, Spec: k8s.ServiceSpec{ ClusterIP: "10.0.0.1", Ports: []k8s.ServicePort{ { Name: "http", - Port: 801, + Port: 80, }, }, }, }, { ObjectMeta: k8s.ObjectMeta{ - Name: "service2", - UID: "2", + Name: "service2", + UID: "2", + Namespace: "testing", }, Spec: k8s.ServiceSpec{ ClusterIP: "10.0.0.2", @@ -84,8 +89,9 @@ func TestLoadIngresses(t *testing.T) { }, { ObjectMeta: k8s.ObjectMeta{ - Name: "service3", - UID: "3", + Name: "service3", + UID: "3", + Namespace: "testing", }, Spec: k8s.ServiceSpec{ ClusterIP: "10.0.0.3", @@ -98,10 +104,46 @@ func TestLoadIngresses(t *testing.T) { }, }, } + endpoints := []k8s.Endpoints{ + { + ObjectMeta: k8s.ObjectMeta{ + Name: "service1", + UID: "1", + Namespace: "testing", + }, + Subsets: []k8s.EndpointSubset{ + { + Addresses: []k8s.EndpointAddress{ + { + IP: "10.10.0.1", + }, + }, + Ports: []k8s.EndpointPort{ + { + Port: 8080, + }, + }, + }, + { + Addresses: []k8s.EndpointAddress{ + { + IP: "10.21.0.1", + }, + }, + Ports: []k8s.EndpointPort{ + { + Port: 8080, + }, + }, + }, + }, + }, + } watchChan := make(chan interface{}) client := clientMock{ ingresses: ingresses, services: services, + endpoints: endpoints, watchChan: watchChan, } provider := Kubernetes{} @@ -114,8 +156,12 @@ func TestLoadIngresses(t *testing.T) { Backends: map[string]*types.Backend{ "foo/bar": { Servers: map[string]types.Server{ - "1": { - URL: "http://10.0.0.1:801", + "http://10.10.0.1:8080": { + URL: "http://10.10.0.1:8080", + Weight: 1, + }, + "http://10.21.0.1:8080": { + URL: "http://10.21.0.1:8080", Weight: 1, }, }, @@ -1150,6 +1196,7 @@ func TestHostlessIngress(t *testing.T) { type clientMock struct { ingresses []k8s.Ingress services []k8s.Service + endpoints []k8s.Endpoints watchChan chan interface{} } @@ -1174,6 +1221,16 @@ func (c clientMock) GetServices(predicate func(k8s.Service) bool) ([]k8s.Service } return services, nil } + +func (c clientMock) GetEndpoints(name, namespace string) (k8s.Endpoints, error) { + for _, endpoints := range c.endpoints { + if endpoints.Namespace == namespace && endpoints.Name == name { + return endpoints, nil + } + } + return k8s.Endpoints{}, nil +} + func (c clientMock) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) { return c.watchChan, make(chan error), nil } From 6accb90c4769f9de24a73ddf7f56e3058cc460d6 Mon Sep 17 00:00:00 2001 From: Ed Robinson Date: Thu, 26 May 2016 00:53:51 +0100 Subject: [PATCH 2/3] Simplify Service Lookup Since we already know the name and namespace of the service(s) we want we can just get the correct one back from the API without filtering the results. --- provider/k8s/client.go | 27 +++++++---------- provider/kubernetes.go | 59 ++++++++++++++++--------------------- provider/kubernetes_test.go | 9 +++--- 3 files changed, 41 insertions(+), 54 deletions(-) diff --git a/provider/k8s/client.go b/provider/k8s/client.go index 930c9835e..0b2195615 100644 --- a/provider/k8s/client.go +++ b/provider/k8s/client.go @@ -16,12 +16,13 @@ const ( APIEndpoint = "/api/v1" extentionsEndpoint = "/apis/extensions/v1beta1" defaultIngress = "/ingresses" + namespaces = "/namespaces/" ) // Client is a client for the Kubernetes master. type Client interface { GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) - GetServices(predicate func(Service) bool) ([]Service, error) + GetService(name, namespace string) (Service, error) GetEndpoints(name, namespace string) (Endpoints, error) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) } @@ -77,26 +78,20 @@ func (c *clientImpl) WatchIngresses(stopCh <-chan bool) (chan interface{}, chan return c.watch(getURL, stopCh) } -// GetServices returns all services in the cluster -func (c *clientImpl) GetServices(predicate func(Service) bool) ([]Service, error) { - getURL := c.endpointURL + APIEndpoint + "/services" +// GetService returns the named service from the named namespace +func (c *clientImpl) GetService(name, namespace string) (Service, error) { + getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/services/" + name body, err := c.do(c.request(getURL)) if err != nil { - return nil, fmt.Errorf("failed to create services request: GET %q : %v", getURL, err) + return Service{}, fmt.Errorf("failed to create services request: GET %q : %v", getURL, err) } - var serviceList ServiceList - if err := json.Unmarshal(body, &serviceList); err != nil { - return nil, fmt.Errorf("failed to decode list of services resources: %v", err) + var service Service + if err := json.Unmarshal(body, &service); err != nil { + return Service{}, fmt.Errorf("failed to decode service resource: %v", err) } - services := serviceList.Items[:0] - for _, service := range serviceList.Items { - if predicate(service) { - services = append(services, service) - } - } - return services, nil + return service, nil } // WatchServices returns all services in the cluster @@ -108,7 +103,7 @@ func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan e // GetEndpoints returns the named Endpoints // Endpoints have the same name as the coresponding service func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) { - getURL := c.endpointURL + APIEndpoint + "/namespaces/" + namespace + "/endpoints/" + name + getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/endpoints/" + name body, err := c.do(c.request(getURL)) if err != nil { diff --git a/provider/kubernetes.go b/provider/kubernetes.go index 11556f49c..abb527af6 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -190,49 +190,42 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur Rule: ruleType + ":" + pa.Path, } } - services, err := k8sClient.GetServices(func(service k8s.Service) bool { - return service.ObjectMeta.Namespace == i.ObjectMeta.Namespace && service.Name == pa.Backend.ServiceName - }) + service, err := k8sClient.GetService(pa.Backend.ServiceName, i.ObjectMeta.Namespace) if err != nil { log.Warnf("Error retrieving services: %v", err) - continue - } - if len(services) == 0 { - // no backends found, delete frontend... delete(templateObjects.Frontends, r.Host+pa.Path) log.Warnf("Error retrieving services %s", pa.Backend.ServiceName) + continue } - for _, service := range services { - protocol := "http" - for _, port := range service.Spec.Ports { - if equalPorts(port, pa.Backend.ServicePort) { - if port.Port == 443 { - protocol = "https" + protocol := "http" + for _, port := range service.Spec.Ports { + if equalPorts(port, pa.Backend.ServicePort) { + if port.Port == 443 { + protocol = "https" + } + endpoints, err := k8sClient.GetEndpoints(service.ObjectMeta.Name, service.ObjectMeta.Namespace) + if err != nil { + log.Errorf("Error retrieving endpoints: %v", err) + continue + } + if len(endpoints.Subsets) == 0 { + log.Warnf("Endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name) + templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{ + URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(port.Port), + Weight: 1, } - endpoints, err := k8sClient.GetEndpoints(service.ObjectMeta.Name, service.ObjectMeta.Namespace) - if err != nil { - log.Errorf("Error retrieving endpoints: %v", err) - continue - } - if len(endpoints.Subsets) == 0 { - log.Warnf("Endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name) - templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{ - URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(port.Port), - Weight: 1, - } - } else { - for _, subset := range endpoints.Subsets { - for _, address := range subset.Addresses { - url := protocol + "://" + address.IP + ":" + strconv.Itoa(endpointPortNumber(port, subset.Ports)) - templateObjects.Backends[r.Host+pa.Path].Servers[url] = types.Server{ - URL: url, - Weight: 1, - } + } else { + for _, subset := range endpoints.Subsets { + for _, address := range subset.Addresses { + url := protocol + "://" + address.IP + ":" + strconv.Itoa(endpointPortNumber(port, subset.Ports)) + templateObjects.Backends[r.Host+pa.Path].Servers[url] = types.Server{ + URL: url, + Weight: 1, } } } - break } + break } } } diff --git a/provider/kubernetes_test.go b/provider/kubernetes_test.go index 9f6ddfb32..39a592f1b 100644 --- a/provider/kubernetes_test.go +++ b/provider/kubernetes_test.go @@ -1212,14 +1212,13 @@ func (c clientMock) GetIngresses(predicate func(k8s.Ingress) bool) ([]k8s.Ingres func (c clientMock) WatchIngresses(predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) { return c.watchChan, make(chan error), nil } -func (c clientMock) GetServices(predicate func(k8s.Service) bool) ([]k8s.Service, error) { - var services []k8s.Service +func (c clientMock) GetService(name, namespace string) (k8s.Service, error) { for _, service := range c.services { - if predicate(service) { - services = append(services, service) + if service.Namespace == namespace && service.Name == name { + return service, nil } } - return services, nil + return k8s.Service{}, nil } func (c clientMock) GetEndpoints(name, namespace string) (k8s.Endpoints, error) { From 2e735f622f6b51e0d8dd95984cdf59e5f832dd9d Mon Sep 17 00:00:00 2001 From: Ed Robinson Date: Thu, 26 May 2016 12:09:36 +0100 Subject: [PATCH 3/3] Adds some more coverage of the endpoint port selection logic. --- provider/kubernetes_test.go | 60 +++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/provider/kubernetes_test.go b/provider/kubernetes_test.go index 39a592f1b..2e31a7df9 100644 --- a/provider/kubernetes_test.go +++ b/provider/kubernetes_test.go @@ -24,7 +24,7 @@ func TestLoadIngresses(t *testing.T) { Path: "/bar", Backend: k8s.IngressBackend{ ServiceName: "service1", - ServicePort: k8s.FromString("http"), + ServicePort: k8s.FromInt(80), }, }, }, @@ -39,7 +39,7 @@ func TestLoadIngresses(t *testing.T) { { Backend: k8s.IngressBackend{ ServiceName: "service3", - ServicePort: k8s.FromInt(443), + ServicePort: k8s.FromString("https"), }, }, { @@ -66,7 +66,6 @@ func TestLoadIngresses(t *testing.T) { ClusterIP: "10.0.0.1", Ports: []k8s.ServicePort{ { - Name: "http", Port: 80, }, }, @@ -98,6 +97,10 @@ func TestLoadIngresses(t *testing.T) { Ports: []k8s.ServicePort{ { Name: "http", + Port: 80, + }, + { + Name: "https", Port: 443, }, }, @@ -138,6 +141,49 @@ func TestLoadIngresses(t *testing.T) { }, }, }, + { + ObjectMeta: k8s.ObjectMeta{ + Name: "service3", + UID: "3", + Namespace: "testing", + }, + Subsets: []k8s.EndpointSubset{ + { + Addresses: []k8s.EndpointAddress{ + { + IP: "10.15.0.1", + }, + }, + Ports: []k8s.EndpointPort{ + { + Name: "http", + Port: 8080, + }, + { + Name: "https", + Port: 8443, + }, + }, + }, + { + Addresses: []k8s.EndpointAddress{ + { + IP: "10.15.0.2", + }, + }, + Ports: []k8s.EndpointPort{ + { + Name: "http", + Port: 9080, + }, + { + Name: "https", + Port: 9443, + }, + }, + }, + }, + }, } watchChan := make(chan interface{}) client := clientMock{ @@ -174,8 +220,12 @@ func TestLoadIngresses(t *testing.T) { URL: "http://10.0.0.2:802", Weight: 1, }, - "3": { - URL: "https://10.0.0.3:443", + "https://10.15.0.1:8443": { + URL: "https://10.15.0.1:8443", + Weight: 1, + }, + "https://10.15.0.2:9443": { + URL: "https://10.15.0.2:9443", Weight: 1, }, },