diff --git a/provider/k8s/client.go b/provider/k8s/client.go index 957d66a9b..0b2195615 100644 --- a/provider/k8s/client.go +++ b/provider/k8s/client.go @@ -16,12 +16,14 @@ const ( APIEndpoint = "/api/v1" extentionsEndpoint = "/apis/extensions/v1beta1" defaultIngress = "/ingresses" + namespaces = "/namespaces/" ) // Client is a client for the Kubernetes master. type Client interface { GetIngresses(predicate func(Ingress) bool) ([]Ingress, error) - GetServices(predicate func(Service) bool) ([]Service, error) + GetService(name, namespace string) (Service, error) + GetEndpoints(name, namespace string) (Endpoints, error) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) } @@ -76,26 +78,20 @@ func (c *clientImpl) WatchIngresses(stopCh <-chan bool) (chan interface{}, chan return c.watch(getURL, stopCh) } -// GetServices returns all services in the cluster -func (c *clientImpl) GetServices(predicate func(Service) bool) ([]Service, error) { - getURL := c.endpointURL + APIEndpoint + "/services" +// GetService returns the named service from the named namespace +func (c *clientImpl) GetService(name, namespace string) (Service, error) { + getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/services/" + name body, err := c.do(c.request(getURL)) if err != nil { - return nil, fmt.Errorf("failed to create services request: GET %q : %v", getURL, err) + return Service{}, fmt.Errorf("failed to create services request: GET %q : %v", getURL, err) } - var serviceList ServiceList - if err := json.Unmarshal(body, &serviceList); err != nil { - return nil, fmt.Errorf("failed to decode list of services resources: %v", err) + var service Service + if err := json.Unmarshal(body, &service); err != nil { + return Service{}, fmt.Errorf("failed to decode service resource: %v", err) } - services := serviceList.Items[:0] - for _, service := range serviceList.Items { - if predicate(service) { - services = append(services, service) - } - } - return services, nil + return service, nil } // WatchServices returns all services in the cluster @@ -104,21 +100,26 @@ func (c *clientImpl) WatchServices(stopCh <-chan bool) (chan interface{}, chan e return c.watch(getURL, stopCh) } -// WatchEvents returns events in the cluster -func (c *clientImpl) WatchEvents(stopCh <-chan bool) (chan interface{}, chan error, error) { - getURL := c.endpointURL + APIEndpoint + "/events" - return c.watch(getURL, stopCh) +// GetEndpoints returns the named Endpoints +// Endpoints have the same name as the coresponding service +func (c *clientImpl) GetEndpoints(name, namespace string) (Endpoints, error) { + getURL := c.endpointURL + APIEndpoint + namespaces + namespace + "/endpoints/" + name + + body, err := c.do(c.request(getURL)) + if err != nil { + return Endpoints{}, fmt.Errorf("failed to create endpoints request: GET %q : %v", getURL, err) + } + + var endpoints Endpoints + if err := json.Unmarshal(body, &endpoints); err != nil { + return Endpoints{}, fmt.Errorf("failed to decode endpoints resources: %v", err) + } + return endpoints, nil } -// WatchPods returns pods in the cluster -func (c *clientImpl) WatchPods(stopCh <-chan bool) (chan interface{}, chan error, error) { - getURL := c.endpointURL + APIEndpoint + "/pods" - return c.watch(getURL, stopCh) -} - -// WatchReplicationControllers returns ReplicationControllers in the cluster -func (c *clientImpl) WatchReplicationControllers(stopCh <-chan bool) (chan interface{}, chan error, error) { - getURL := c.endpointURL + APIEndpoint + "/replicationcontrollers" +// WatchEndpoints returns endpoints in the cluster +func (c *clientImpl) WatchEndpoints(stopCh <-chan bool) (chan interface{}, chan error, error) { + getURL := c.endpointURL + APIEndpoint + "/endpoints" return c.watch(getURL, stopCh) } @@ -137,13 +138,8 @@ func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, if err != nil { return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) } - stopPods := make(chan bool) - chanPods, chanPodsErr, err := c.WatchPods(stopPods) - if err != nil { - return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) - } - stopReplicationControllers := make(chan bool) - chanReplicationControllers, chanReplicationControllersErr, err := c.WatchReplicationControllers(stopReplicationControllers) + stopEndpoints := make(chan bool) + chanEndpoints, chanEndpointsErr, err := c.WatchEndpoints(stopEndpoints) if err != nil { return watchCh, errCh, fmt.Errorf("failed to create watch: %v", err) } @@ -152,32 +148,26 @@ func (c *clientImpl) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, defer close(errCh) defer close(stopIngresses) defer close(stopServices) - defer close(stopPods) - defer close(stopReplicationControllers) + defer close(stopEndpoints) for { select { case <-stopCh: stopIngresses <- true stopServices <- true - stopPods <- true - stopReplicationControllers <- true + stopEndpoints <- true return case err := <-chanIngressesErr: errCh <- err case err := <-chanServicesErr: errCh <- err - case err := <-chanPodsErr: - errCh <- err - case err := <-chanReplicationControllersErr: + case err := <-chanEndpointsErr: errCh <- err case event := <-chanIngresses: watchCh <- event case event := <-chanServices: watchCh <- event - case event := <-chanPods: - watchCh <- event - case event := <-chanReplicationControllers: + case event := <-chanEndpoints: watchCh <- event } } diff --git a/provider/k8s/endpoints.go b/provider/k8s/endpoints.go new file mode 100644 index 000000000..123ffe36c --- /dev/null +++ b/provider/k8s/endpoints.go @@ -0,0 +1,84 @@ +package k8s + +// Endpoints is a collection of endpoints that implement the actual service. Example: +// Name: "mysvc", +// Subsets: [ +// { +// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}], +// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}] +// }, +// { +// Addresses: [{"ip": "10.10.3.3"}], +// Ports: [{"name": "a", "port": 93}, {"name": "b", "port": 76}] +// }, +// ] +type Endpoints struct { + TypeMeta `json:",inline"` + ObjectMeta `json:"metadata,omitempty"` + + // The set of all endpoints is the union of all subsets. + Subsets []EndpointSubset +} + +// EndpointSubset is a group of addresses with a common set of ports. The +// expanded set of endpoints is the Cartesian product of Addresses x Ports. +// For example, given: +// { +// Addresses: [{"ip": "10.10.1.1"}, {"ip": "10.10.2.2"}], +// Ports: [{"name": "a", "port": 8675}, {"name": "b", "port": 309}] +// } +// The resulting set of endpoints can be viewed as: +// a: [ 10.10.1.1:8675, 10.10.2.2:8675 ], +// b: [ 10.10.1.1:309, 10.10.2.2:309 ] +type EndpointSubset struct { + Addresses []EndpointAddress + NotReadyAddresses []EndpointAddress + Ports []EndpointPort +} + +// EndpointAddress is a tuple that describes single IP address. +type EndpointAddress struct { + // The IP of this endpoint. + // IPv6 is also accepted but not fully supported on all platforms. Also, certain + // kubernetes components, like kube-proxy, are not IPv6 ready. + // TODO: This should allow hostname or IP, see #4447. + IP string + // Optional: Hostname of this endpoint + // Meant to be used by DNS servers etc. + Hostname string `json:"hostname,omitempty"` + // Optional: The kubernetes object related to the entry point. + TargetRef *ObjectReference +} + +// EndpointPort is a tuple that describes a single port. +type EndpointPort struct { + // The name of this port (corresponds to ServicePort.Name). Optional + // if only one port is defined. Must be a DNS_LABEL. + Name string + + // The port number. + Port int32 + + // The IP protocol for this port. + Protocol Protocol +} + +// ObjectReference contains enough information to let you inspect or modify the referred object. +type ObjectReference struct { + Kind string `json:"kind,omitempty"` + Namespace string `json:"namespace,omitempty"` + Name string `json:"name,omitempty"` + UID UID `json:"uid,omitempty"` + APIVersion string `json:"apiVersion,omitempty"` + ResourceVersion string `json:"resourceVersion,omitempty"` + + // Optional. If referring to a piece of an object instead of an entire object, this string + // should contain information to identify the sub-object. For example, if the object + // reference is to a container within a pod, this would take on a value like: + // "spec.containers{name}" (where "name" refers to the name of the container that triggered + // the event) or if no container name is specified "spec.containers[2]" (container with + // index 2 in this pod). This syntax is chosen only to have some well-defined way of + // referencing a part of an object. + // TODO: this design is not final and this field is subject to change in the future. + FieldPath string `json:"fieldPath,omitempty"` +} diff --git a/provider/kubernetes.go b/provider/kubernetes.go index 7362898b2..abb527af6 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -190,31 +190,42 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur Rule: ruleType + ":" + pa.Path, } } - services, err := k8sClient.GetServices(func(service k8s.Service) bool { - return service.ObjectMeta.Namespace == i.ObjectMeta.Namespace && service.Name == pa.Backend.ServiceName - }) + service, err := k8sClient.GetService(pa.Backend.ServiceName, i.ObjectMeta.Namespace) if err != nil { log.Warnf("Error retrieving services: %v", err) - continue - } - if len(services) == 0 { - // no backends found, delete frontend... delete(templateObjects.Frontends, r.Host+pa.Path) log.Warnf("Error retrieving services %s", pa.Backend.ServiceName) + continue } - for _, service := range services { - protocol := "http" - for _, port := range service.Spec.Ports { - if equalPorts(port, pa.Backend.ServicePort) { - if port.Port == 443 { - protocol = "https" - } + protocol := "http" + for _, port := range service.Spec.Ports { + if equalPorts(port, pa.Backend.ServicePort) { + if port.Port == 443 { + protocol = "https" + } + endpoints, err := k8sClient.GetEndpoints(service.ObjectMeta.Name, service.ObjectMeta.Namespace) + if err != nil { + log.Errorf("Error retrieving endpoints: %v", err) + continue + } + if len(endpoints.Subsets) == 0 { + log.Warnf("Endpoints not found for %s/%s, falling back to Service ClusterIP", service.ObjectMeta.Namespace, service.ObjectMeta.Name) templateObjects.Backends[r.Host+pa.Path].Servers[string(service.UID)] = types.Server{ URL: protocol + "://" + service.Spec.ClusterIP + ":" + strconv.Itoa(port.Port), Weight: 1, } - break + } else { + for _, subset := range endpoints.Subsets { + for _, address := range subset.Addresses { + url := protocol + "://" + address.IP + ":" + strconv.Itoa(endpointPortNumber(port, subset.Ports)) + templateObjects.Backends[r.Host+pa.Path].Servers[url] = types.Server{ + URL: url, + Weight: 1, + } + } + } } + break } } } @@ -223,6 +234,20 @@ func (provider *Kubernetes) loadIngresses(k8sClient k8s.Client) (*types.Configur return &templateObjects, nil } +func endpointPortNumber(servicePort k8s.ServicePort, endpointPorts []k8s.EndpointPort) int { + if len(endpointPorts) > 0 { + //name is optional if there is only one port + port := endpointPorts[0] + for _, endpointPort := range endpointPorts { + if servicePort.Name == endpointPort.Name { + port = endpointPort + } + } + return int(port.Port) + } + return servicePort.Port +} + func equalPorts(servicePort k8s.ServicePort, ingressPort k8s.IntOrString) bool { if servicePort.Port == ingressPort.IntValue() { return true diff --git a/provider/kubernetes_test.go b/provider/kubernetes_test.go index b74b0536d..2e31a7df9 100644 --- a/provider/kubernetes_test.go +++ b/provider/kubernetes_test.go @@ -10,6 +10,9 @@ import ( func TestLoadIngresses(t *testing.T) { ingresses := []k8s.Ingress{{ + ObjectMeta: k8s.ObjectMeta{ + Namespace: "testing", + }, Spec: k8s.IngressSpec{ Rules: []k8s.IngressRule{ { @@ -21,7 +24,7 @@ func TestLoadIngresses(t *testing.T) { Path: "/bar", Backend: k8s.IngressBackend{ ServiceName: "service1", - ServicePort: k8s.FromString("http"), + ServicePort: k8s.FromInt(80), }, }, }, @@ -36,7 +39,7 @@ func TestLoadIngresses(t *testing.T) { { Backend: k8s.IngressBackend{ ServiceName: "service3", - ServicePort: k8s.FromInt(443), + ServicePort: k8s.FromString("https"), }, }, { @@ -55,23 +58,24 @@ func TestLoadIngresses(t *testing.T) { services := []k8s.Service{ { ObjectMeta: k8s.ObjectMeta{ - Name: "service1", - UID: "1", + Name: "service1", + UID: "1", + Namespace: "testing", }, Spec: k8s.ServiceSpec{ ClusterIP: "10.0.0.1", Ports: []k8s.ServicePort{ { - Name: "http", - Port: 801, + Port: 80, }, }, }, }, { ObjectMeta: k8s.ObjectMeta{ - Name: "service2", - UID: "2", + Name: "service2", + UID: "2", + Namespace: "testing", }, Spec: k8s.ServiceSpec{ ClusterIP: "10.0.0.2", @@ -84,24 +88,108 @@ func TestLoadIngresses(t *testing.T) { }, { ObjectMeta: k8s.ObjectMeta{ - Name: "service3", - UID: "3", + Name: "service3", + UID: "3", + Namespace: "testing", }, Spec: k8s.ServiceSpec{ ClusterIP: "10.0.0.3", Ports: []k8s.ServicePort{ { Name: "http", + Port: 80, + }, + { + Name: "https", Port: 443, }, }, }, }, } + endpoints := []k8s.Endpoints{ + { + ObjectMeta: k8s.ObjectMeta{ + Name: "service1", + UID: "1", + Namespace: "testing", + }, + Subsets: []k8s.EndpointSubset{ + { + Addresses: []k8s.EndpointAddress{ + { + IP: "10.10.0.1", + }, + }, + Ports: []k8s.EndpointPort{ + { + Port: 8080, + }, + }, + }, + { + Addresses: []k8s.EndpointAddress{ + { + IP: "10.21.0.1", + }, + }, + Ports: []k8s.EndpointPort{ + { + Port: 8080, + }, + }, + }, + }, + }, + { + ObjectMeta: k8s.ObjectMeta{ + Name: "service3", + UID: "3", + Namespace: "testing", + }, + Subsets: []k8s.EndpointSubset{ + { + Addresses: []k8s.EndpointAddress{ + { + IP: "10.15.0.1", + }, + }, + Ports: []k8s.EndpointPort{ + { + Name: "http", + Port: 8080, + }, + { + Name: "https", + Port: 8443, + }, + }, + }, + { + Addresses: []k8s.EndpointAddress{ + { + IP: "10.15.0.2", + }, + }, + Ports: []k8s.EndpointPort{ + { + Name: "http", + Port: 9080, + }, + { + Name: "https", + Port: 9443, + }, + }, + }, + }, + }, + } watchChan := make(chan interface{}) client := clientMock{ ingresses: ingresses, services: services, + endpoints: endpoints, watchChan: watchChan, } provider := Kubernetes{} @@ -114,8 +202,12 @@ func TestLoadIngresses(t *testing.T) { Backends: map[string]*types.Backend{ "foo/bar": { Servers: map[string]types.Server{ - "1": { - URL: "http://10.0.0.1:801", + "http://10.10.0.1:8080": { + URL: "http://10.10.0.1:8080", + Weight: 1, + }, + "http://10.21.0.1:8080": { + URL: "http://10.21.0.1:8080", Weight: 1, }, }, @@ -128,8 +220,12 @@ func TestLoadIngresses(t *testing.T) { URL: "http://10.0.0.2:802", Weight: 1, }, - "3": { - URL: "https://10.0.0.3:443", + "https://10.15.0.1:8443": { + URL: "https://10.15.0.1:8443", + Weight: 1, + }, + "https://10.15.0.2:9443": { + URL: "https://10.15.0.2:9443", Weight: 1, }, }, @@ -1150,6 +1246,7 @@ func TestHostlessIngress(t *testing.T) { type clientMock struct { ingresses []k8s.Ingress services []k8s.Service + endpoints []k8s.Endpoints watchChan chan interface{} } @@ -1165,15 +1262,24 @@ func (c clientMock) GetIngresses(predicate func(k8s.Ingress) bool) ([]k8s.Ingres func (c clientMock) WatchIngresses(predicate func(k8s.Ingress) bool, stopCh <-chan bool) (chan interface{}, chan error, error) { return c.watchChan, make(chan error), nil } -func (c clientMock) GetServices(predicate func(k8s.Service) bool) ([]k8s.Service, error) { - var services []k8s.Service +func (c clientMock) GetService(name, namespace string) (k8s.Service, error) { for _, service := range c.services { - if predicate(service) { - services = append(services, service) + if service.Namespace == namespace && service.Name == name { + return service, nil } } - return services, nil + return k8s.Service{}, nil } + +func (c clientMock) GetEndpoints(name, namespace string) (k8s.Endpoints, error) { + for _, endpoints := range c.endpoints { + if endpoints.Namespace == namespace && endpoints.Name == name { + return endpoints, nil + } + } + return k8s.Endpoints{}, nil +} + func (c clientMock) WatchAll(stopCh <-chan bool) (chan interface{}, chan error, error) { return c.watchChan, make(chan error), nil }