traefik/provider/k8s/client.go

271 lines
7.3 KiB
Go

package k8s
import (
"k8s.io/client-go/1.5/kubernetes"
"k8s.io/client-go/1.5/pkg/api"
"k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/pkg/apis/extensions/v1beta1"
"k8s.io/client-go/1.5/pkg/fields"
"k8s.io/client-go/1.5/pkg/labels"
"k8s.io/client-go/1.5/pkg/runtime"
"k8s.io/client-go/1.5/pkg/watch"
"k8s.io/client-go/1.5/rest"
"k8s.io/client-go/1.5/tools/cache"
)
// Client is a client for the Kubernetes master.
type Client interface {
GetIngresses(namespaces Namespaces) []*v1beta1.Ingress
GetService(namespace, name string) (*v1.Service, bool, error)
GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error)
WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, error)
}
type clientImpl struct {
ingController *cache.Controller
svcController *cache.Controller
epController *cache.Controller
ingStore cache.Store
svcStore cache.Store
epStore cache.Store
clientset *kubernetes.Clientset
}
// NewInClusterClient returns a new Kubernetes client.
// WatchAll starts the watch of the Kubernetes ressources and updates the stores.
// The stores can be accessed via the Get* functions.
func NewInClusterClient() (Client, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &clientImpl{
clientset: clientset,
}, nil
}
// NewInClusterClientWithEndpoint is the same as NewInClusterClient but uses the provided endpoint URL
func NewInClusterClientWithEndpoint(endpoint string) (Client, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
config.Host = endpoint
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &clientImpl{
clientset: clientset,
}, nil
}
// GetIngresses returns all ingresses in the cluster
func (c *clientImpl) GetIngresses(namespaces Namespaces) []*v1beta1.Ingress {
ingList := c.ingStore.List()
result := make([]*v1beta1.Ingress, 0, len(ingList))
for _, obj := range ingList {
ingress := obj.(*v1beta1.Ingress)
if HasNamespace(ingress, namespaces) {
result = append(result, ingress)
}
}
return result
}
// WatchIngresses starts the watch of Kubernetes Ingresses resources and updates the corresponding store
func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, stopCh <-chan struct{}) chan interface{} {
watchCh := make(chan interface{}, 10)
source := NewListWatchFromClient(
c.clientset.ExtensionsClient,
"ingresses",
api.NamespaceAll,
fields.Everything(),
labelSelector)
c.ingStore, c.ingController = cache.NewInformer(
source,
&v1beta1.Ingress{},
0,
newResourceEventHandlerFuncs(watchCh))
go c.ingController.Run(stopCh)
return watchCh
}
func newResourceEventHandlerFuncs(events chan interface{}) cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { events <- obj },
UpdateFunc: func(old, new interface{}) { events <- new },
DeleteFunc: func(obj interface{}) { events <- obj },
}
}
// GetService returns the named service from the named namespace
func (c *clientImpl) GetService(namespace, name string) (*v1.Service, bool, error) {
var service *v1.Service
item, exists, err := c.svcStore.GetByKey(namespace + "/" + name)
if item != nil {
service = item.(*v1.Service)
}
return service, exists, err
}
// WatchServices starts the watch of Kubernetes Service resources and updates the corresponding store
func (c *clientImpl) WatchServices(stopCh <-chan struct{}) chan interface{} {
watchCh := make(chan interface{}, 10)
source := cache.NewListWatchFromClient(
c.clientset.CoreClient,
"services",
api.NamespaceAll,
fields.Everything())
c.svcStore, c.svcController = cache.NewInformer(
source,
&v1.Service{},
0,
newResourceEventHandlerFuncs(watchCh))
go c.svcController.Run(stopCh)
return watchCh
}
// GetEndpoints returns the named Endpoints
// Endpoints have the same name as the coresponding service
func (c *clientImpl) GetEndpoints(namespace, name string) (*v1.Endpoints, bool, error) {
var endpoint *v1.Endpoints
item, exists, err := c.epStore.GetByKey(namespace + "/" + name)
if item != nil {
endpoint = item.(*v1.Endpoints)
}
return endpoint, exists, err
}
// WatchEndpoints starts the watch of Kubernetes Endpoints resources and updates the corresponding store
func (c *clientImpl) WatchEndpoints(stopCh <-chan struct{}) chan interface{} {
watchCh := make(chan interface{}, 10)
source := cache.NewListWatchFromClient(
c.clientset.CoreClient,
"endpoints",
api.NamespaceAll,
fields.Everything())
c.epStore, c.epController = cache.NewInformer(
source,
&v1.Endpoints{},
0,
newResourceEventHandlerFuncs(watchCh))
go c.epController.Run(stopCh)
return watchCh
}
// WatchAll returns events in the cluster and updates the stores via informer
// Filters ingresses by labelSelector
func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan bool) (chan interface{}, error) {
watchCh := make(chan interface{}, 10)
kubeLabelSelector, err := labels.Parse(labelSelector)
if err != nil {
return nil, err
}
stopIngresses := make(chan struct{})
chanIngresses := c.WatchIngresses(kubeLabelSelector, stopIngresses)
stopServices := make(chan struct{})
chanServices := c.WatchServices(stopServices)
stopEndpoints := make(chan struct{})
chanEndpoints := c.WatchEndpoints(stopEndpoints)
go func() {
defer close(watchCh)
defer close(stopIngresses)
defer close(stopServices)
defer close(stopEndpoints)
for {
select {
case <-stopCh:
return
case event := <-chanIngresses:
c.fireEvent(event, watchCh)
case event := <-chanServices:
c.fireEvent(event, watchCh)
case event := <-chanEndpoints:
c.fireEvent(event, watchCh)
}
}
}()
return watchCh, nil
}
// fireEvent checks if all controllers have synced before firing
// Used after startup or a reconnect
func (c *clientImpl) fireEvent(event interface{}, watchCh chan interface{}) {
if c.ingController.HasSynced() && c.svcController.HasSynced() && c.epController.HasSynced() {
watchCh <- event
}
}
// HasNamespace checks if the ingress is in one of the namespaces
func HasNamespace(ingress *v1beta1.Ingress, namespaces Namespaces) bool {
if len(namespaces) == 0 {
return true
}
for _, n := range namespaces {
if ingress.ObjectMeta.Namespace == n {
return true
}
}
return false
}
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, field selector and label selector.
// Extends cache.NewListWatchFromClient to support labelSelector
func NewListWatchFromClient(c cache.Getter, resource string, namespace string, fieldSelector fields.Selector, labelSelector labels.Selector) *cache.ListWatch {
listFunc := func(options api.ListOptions) (runtime.Object, error) {
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, api.ParameterCodec).
FieldsSelectorParam(fieldSelector).
LabelsSelectorParam(labelSelector).
Do().
Get()
}
watchFunc := func(options api.ListOptions) (watch.Interface, error) {
return c.Get().
Prefix("watch").
Namespace(namespace).
Resource(resource).
VersionedParams(&options, api.ParameterCodec).
FieldsSelectorParam(fieldSelector).
LabelsSelectorParam(labelSelector).
Watch()
}
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}