diff --git a/cmd.go b/cmd.go index c65bf28a9..4dc18db42 100644 --- a/cmd.go +++ b/cmd.go @@ -39,28 +39,30 @@ var versionCmd = &cobra.Command{ var arguments = struct { GlobalConfiguration - web bool - file bool - docker bool - dockerTLS bool - marathon bool - consul bool - zookeeper bool - etcd bool - boltdb bool + web bool + file bool + docker bool + dockerTLS bool + marathon bool + consul bool + consulCatalog bool + zookeeper bool + etcd bool + boltdb bool }{ GlobalConfiguration{ EntryPoints: make(EntryPoints), Docker: &provider.Docker{ TLS: &provider.DockerTLS{}, }, - File: &provider.File{}, - Web: &WebProvider{}, - Marathon: &provider.Marathon{}, - Consul: &provider.Consul{}, - Zookeeper: &provider.Zookepper{}, - Etcd: &provider.Etcd{}, - Boltdb: &provider.BoltDb{}, + File: &provider.File{}, + Web: &WebProvider{}, + Marathon: &provider.Marathon{}, + Consul: &provider.Consul{}, + ConsulCatalog: &provider.ConsulCatalog{}, + Zookeeper: &provider.Zookepper{}, + Etcd: &provider.Etcd{}, + Boltdb: &provider.BoltDb{}, }, false, false, @@ -71,6 +73,7 @@ var arguments = struct { false, false, false, + false, } func init() { @@ -119,6 +122,10 @@ func init() { traefikCmd.PersistentFlags().StringVar(&arguments.Consul.Endpoint, "consul.endpoint", "127.0.0.1:8500", "Consul server endpoint") traefikCmd.PersistentFlags().StringVar(&arguments.Consul.Prefix, "consul.prefix", "/traefik", "Prefix used for KV store") + traefikCmd.PersistentFlags().BoolVar(&arguments.consulCatalog, "consulCatalog", false, "Enable Consul catalog backend") + traefikCmd.PersistentFlags().StringVar(&arguments.ConsulCatalog.Domain, "consulCatalog.domain", "", "Default domain used") + traefikCmd.PersistentFlags().StringVar(&arguments.ConsulCatalog.Endpoint, "consulCatalog.endpoint", "127.0.0.1:8500", "Consul server endpoint") + traefikCmd.PersistentFlags().BoolVar(&arguments.zookeeper, "zookeeper", false, "Enable Zookeeper backend") traefikCmd.PersistentFlags().BoolVar(&arguments.Zookeeper.Watch, "zookeeper.watch", true, "Watch provider") traefikCmd.PersistentFlags().StringVar(&arguments.Zookeeper.Filename, "zookeeper.filename", "", "Override default configuration template. For advanced users :)") diff --git a/configuration.go b/configuration.go index facd3e8f4..f331c015f 100644 --- a/configuration.go +++ b/configuration.go @@ -30,6 +30,7 @@ type GlobalConfiguration struct { Web *WebProvider Marathon *provider.Marathon Consul *provider.Consul + ConsulCatalog *provider.ConsulCatalog Etcd *provider.Etcd Zookeeper *provider.Zookepper Boltdb *provider.BoltDb @@ -224,6 +225,9 @@ func LoadConfiguration() *GlobalConfiguration { if arguments.consul { viper.Set("consul", arguments.Consul) } + if arguments.consulCatalog { + viper.Set("consulCatalog", arguments.ConsulCatalog) + } if arguments.zookeeper { viper.Set("zookeeper", arguments.Zookeeper) } diff --git a/docs/index.md b/docs/index.md index 3d4034be5..b4d7d4891 100644 --- a/docs/index.md +++ b/docs/index.md @@ -12,6 +12,7 @@ ___ - [Docker backend](#docker) - [Mesos/Marathon backend](#marathon) - [Consul backend](#consul) +- [Consul catalog backend](#consulcatalog) - [Etcd backend](#etcd) - [Zookeeper backend](#zk) - [Boltdb backend](#boltdb) @@ -109,6 +110,9 @@ Flags: --consul.filename string Override default configuration template. For advanced users :) --consul.prefix string Prefix used for KV store (default "/traefik") --consul.watch Watch provider (default true) + --consulCatalog Enable Consul catalog backend + --consulCatalog.domain string Default domain used + --consulCatalog.endpoint string Consul server endpoint (default "127.0.0.1:8500") --defaultEntryPoints value Entrypoints to be used by frontends that do not specify any entrypoint (default &main.DefaultEntryPoints(nil)) --docker Enable Docker backend --docker.domain string Default domain used @@ -842,6 +846,37 @@ The Keys-Values structure should look (using `prefix = "/traefik"`): | `/traefik/frontends/frontend2/routes/test_2/value` | `/test` | +## Consul catalog backend + +Træfɪk can be configured to use service discovery catalog of Consul as a backend configuration: + +```toml +################################################################ +# Consul Catalog configuration backend +################################################################ + +# Enable Consul Catalog configuration backend +# +# Optional +# +[consulCatalog] + +# Consul server endpoint +# +# Required +# +endpoint = "127.0.0.1:8500" + +# Default domain used. +# +# Optional +# +domain = "consul.localhost" +``` + +This backend will create routes matching on hostname based on the service name +used in consul. + ## Zookeeper backend Træfɪk can be configured to use Zookeeper as a backend configuration: diff --git a/glide.yaml b/glide.yaml index e0aacec11..1d15ab1c3 100644 --- a/glide.yaml +++ b/glide.yaml @@ -124,7 +124,7 @@ import: - package: gopkg.in/alecthomas/kingpin.v2 ref: 639879d6110b1b0409410c7b737ef0bb18325038 - package: github.com/docker/libcompose - ref: 79ef5d150f053a5b12f16b02d8844ed7cf33611a + ref: d3089811c119a211469a9cc93caea684d937e5d3 subpackages: - docker - logger diff --git a/integration/consul_catalog_test.go b/integration/consul_catalog_test.go new file mode 100644 index 000000000..18891cba7 --- /dev/null +++ b/integration/consul_catalog_test.go @@ -0,0 +1,132 @@ +package main + +import ( + "fmt" + "io/ioutil" + "net/http" + "os" + "os/exec" + "time" + + "github.com/docker/docker/opts" + "github.com/fsouza/go-dockerclient" + "github.com/hashicorp/consul/api" + checker "github.com/vdemeester/shakers" + check "gopkg.in/check.v1" +) + +// Consul catalog test suites +type ConsulCatalogSuite struct { + BaseSuite + consulIP string + consulClient *api.Client + dockerClient *docker.Client +} + +func (s *ConsulCatalogSuite) GetContainer(name string) (*docker.Container, error) { + return s.dockerClient.InspectContainer(name) +} + +func (s *ConsulCatalogSuite) SetUpSuite(c *check.C) { + dockerHost := os.Getenv("DOCKER_HOST") + if dockerHost == "" { + // FIXME Handle windows -- see if dockerClient already handle that or not + dockerHost = fmt.Sprintf("unix://%s", opts.DefaultUnixSocket) + } + // Make sure we can speak to docker + dockerClient, err := docker.NewClient(dockerHost) + c.Assert(err, checker.IsNil, check.Commentf("Error connecting to docker daemon")) + s.dockerClient = dockerClient + + s.createComposeProject(c, "consul_catalog") + err = s.composeProject.Up() + c.Assert(err, checker.IsNil, check.Commentf("Error starting project")) + + consul, err := s.GetContainer("integration-test-consul_catalog_consul_1") + c.Assert(err, checker.IsNil, check.Commentf("Error finding consul container")) + + s.consulIP = consul.NetworkSettings.IPAddress + config := api.DefaultConfig() + config.Address = s.consulIP + ":8500" + consulClient, err := api.NewClient(config) + if err != nil { + c.Fatalf("Error creating consul client") + } + s.consulClient = consulClient + + // Wait for consul to elect itself leader + time.Sleep(2000 * time.Millisecond) +} + +func (s *ConsulCatalogSuite) registerService(name string, address string, port int) error { + catalog := s.consulClient.Catalog() + _, err := catalog.Register( + &api.CatalogRegistration{ + Node: address, + Address: address, + Service: &api.AgentService{ + ID: name, + Service: name, + Address: address, + Port: port, + }, + }, + &api.WriteOptions{}, + ) + return err +} + +func (s *ConsulCatalogSuite) deregisterService(name string, address string) error { + catalog := s.consulClient.Catalog() + _, err := catalog.Deregister( + &api.CatalogDeregistration{ + Node: address, + Address: address, + ServiceID: name, + }, + &api.WriteOptions{}, + ) + return err +} + +func (s *ConsulCatalogSuite) TestSimpleConfiguration(c *check.C) { + cmd := exec.Command(traefikBinary, "--consulCatalog", "--consulCatalog.endpoint="+s.consulIP+":8500", "--configFile=fixtures/consul_catalog/simple.toml") + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer cmd.Process.Kill() + + time.Sleep(500 * time.Millisecond) + // TODO validate : run on 80 + resp, err := http.Get("http://127.0.0.1:8000/") + + // Expected a 404 as we did not configure anything + c.Assert(err, checker.IsNil) + c.Assert(resp.StatusCode, checker.Equals, 404) +} + +func (s *ConsulCatalogSuite) TestSingleService(c *check.C) { + cmd := exec.Command(traefikBinary, "--consulCatalog", "--consulCatalog.endpoint="+s.consulIP+":8500", "--consulCatalog.domain=consul.localhost", "--configFile=fixtures/consul_catalog/simple.toml") + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer cmd.Process.Kill() + + nginx, err := s.GetContainer("integration-test-consul_catalog_nginx_1") + c.Assert(err, checker.IsNil, check.Commentf("Error finding nginx container")) + + err = s.registerService("test", nginx.NetworkSettings.IPAddress, 80) + c.Assert(err, checker.IsNil, check.Commentf("Error registering service")) + defer s.deregisterService("test", nginx.NetworkSettings.IPAddress) + + time.Sleep(5000 * time.Millisecond) + client := &http.Client{} + req, err := http.NewRequest("GET", "http://127.0.0.1:8000/", nil) + c.Assert(err, checker.IsNil) + req.Host = "test.consul.localhost" + resp, err := client.Do(req) + + c.Assert(err, checker.IsNil) + c.Assert(resp.StatusCode, checker.Equals, 200) + + _, err = ioutil.ReadAll(resp.Body) + c.Assert(err, checker.IsNil) +} diff --git a/integration/fixtures/consul_catalog/simple.toml b/integration/fixtures/consul_catalog/simple.toml new file mode 100644 index 000000000..45988382a --- /dev/null +++ b/integration/fixtures/consul_catalog/simple.toml @@ -0,0 +1,9 @@ +defaultEntryPoints = ["http"] +logLevel = "DEBUG" + +[entryPoints] + [entryPoints.http] + address = ":8000" + +[consulCatalog] +domain = "consul.localhost" diff --git a/integration/integration_test.go b/integration/integration_test.go index 27b6d41f5..828bb689e 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -28,6 +28,7 @@ func init() { check.Suite(&FileSuite{}) check.Suite(&DockerSuite{}) check.Suite(&ConsulSuite{}) + check.Suite(&ConsulCatalogSuite{}) check.Suite(&MarathonSuite{}) } @@ -80,7 +81,9 @@ func (s *BaseSuite) TearDownSuite(c *check.C) { func (s *BaseSuite) createComposeProject(c *check.C, name string) { composeProject, err := docker.NewProject(&docker.Context{ Context: project.Context{ - ComposeFile: fmt.Sprintf("resources/compose/%s.yml", name), + ComposeFiles: []string{ + fmt.Sprintf("resources/compose/%s.yml", name), + }, ProjectName: fmt.Sprintf("integration-test-%s", name), }, }) diff --git a/integration/resources/compose/consul_catalog.yml b/integration/resources/compose/consul_catalog.yml new file mode 100644 index 000000000..9a2688904 --- /dev/null +++ b/integration/resources/compose/consul_catalog.yml @@ -0,0 +1,17 @@ +consul: + image: progrium/consul + command: -server -bootstrap -log-level debug -ui-dir /ui + ports: + - "8400:8400" + - "8500:8500" + - "8600:53/udp" + expose: + - "8300" + - "8301" + - "8301/udp" + - "8302" + - "8302/udp" +nginx: + image: nginx + ports: + - "8881:80" diff --git a/integration/resources/compose/marathon.yml b/integration/resources/compose/marathon.yml index 91685e963..d489db2fa 100644 --- a/integration/resources/compose/marathon.yml +++ b/integration/resources/compose/marathon.yml @@ -3,7 +3,7 @@ zk: net: host environment: ZK_CONFIG: tickTime=2000,initLimit=10,syncLimit=5,maxClientCnxns=128,forceSync=no,clientPort=2181 - ZK_ID: 1 + ZK_ID: " 1" master: image: mesosphere/mesos-master:0.23.0-1.0.ubuntu1404 @@ -12,7 +12,7 @@ master: MESOS_ZK: zk://127.0.0.1:2181/mesos MESOS_HOSTNAME: 127.0.0.1 MESOS_IP: 127.0.0.1 - MESOS_QUORUM: 1 + MESOS_QUORUM: " 1" MESOS_CLUSTER: docker-compose MESOS_WORK_DIR: /var/lib/mesos diff --git a/provider/consul_catalog.go b/provider/consul_catalog.go new file mode 100644 index 000000000..5315ec8b4 --- /dev/null +++ b/provider/consul_catalog.go @@ -0,0 +1,199 @@ +package provider + +import ( + "errors" + "strings" + "text/template" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/cenkalti/backoff" + "github.com/emilevauge/traefik/types" + "github.com/hashicorp/consul/api" +) + +const ( + // DefaultWatchWaitTime is the duration to wait when polling consul + DefaultWatchWaitTime = 15 * time.Second +) + +// ConsulCatalog holds configurations of the Consul catalog provider. +type ConsulCatalog struct { + BaseProvider `mapstructure:",squash"` + Endpoint string + Domain string + client *api.Client +} + +type catalogUpdate struct { + Service string + Nodes []*api.ServiceEntry +} + +func (provider *ConsulCatalog) watchServices(stopCh <-chan struct{}) <-chan map[string][]string { + watchCh := make(chan map[string][]string) + + catalog := provider.client.Catalog() + + go func() { + defer close(watchCh) + + opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} + + for { + select { + case <-stopCh: + return + default: + } + + data, meta, err := catalog.Services(opts) + if err != nil { + log.WithError(err).Errorf("Failed to list services") + return + } + + // If LastIndex didn't change then it means `Get` returned + // because of the WaitTime and the key didn't changed. + if opts.WaitIndex == meta.LastIndex { + continue + } + opts.WaitIndex = meta.LastIndex + + if data != nil { + watchCh <- data + } + } + }() + + return watchCh +} + +func (provider *ConsulCatalog) healthyNodes(service string) (catalogUpdate, error) { + health := provider.client.Health() + opts := &api.QueryOptions{} + data, _, err := health.Service(service, "", true, opts) + if err != nil { + log.WithError(err).Errorf("Failed to fetch details of " + service) + return catalogUpdate{}, err + } + + return catalogUpdate{ + Service: service, + Nodes: data, + }, nil +} + +func (provider *ConsulCatalog) getBackend(node *api.ServiceEntry) string { + return strings.ToLower(node.Service.Service) +} + +func (provider *ConsulCatalog) getFrontendValue(service string) string { + return service + "." + provider.Domain +} + +func (provider *ConsulCatalog) buildConfig(catalog []catalogUpdate) *types.Configuration { + var FuncMap = template.FuncMap{ + "getBackend": provider.getBackend, + "getFrontendValue": provider.getFrontendValue, + "replace": replace, + } + + allNodes := []*api.ServiceEntry{} + serviceNames := []string{} + for _, info := range catalog { + if len(info.Nodes) > 0 { + serviceNames = append(serviceNames, info.Service) + allNodes = append(allNodes, info.Nodes...) + } + } + + templateObjects := struct { + Services []string + Nodes []*api.ServiceEntry + }{ + Services: serviceNames, + Nodes: allNodes, + } + + configuration, err := provider.getConfiguration("templates/consul_catalog.tmpl", FuncMap, templateObjects) + if err != nil { + log.WithError(err).Error("Failed to create config") + } + + return configuration +} + +func (provider *ConsulCatalog) getNodes(index map[string][]string) ([]catalogUpdate, error) { + visited := make(map[string]bool) + + nodes := []catalogUpdate{} + for service := range index { + name := strings.ToLower(service) + if !strings.Contains(name, " ") && !visited[name] { + visited[name] = true + log.WithFields(log.Fields{ + "service": name, + }).Debug("Fetching service") + healthy, err := provider.healthyNodes(name) + if err != nil { + return nil, err + } + nodes = append(nodes, healthy) + } + } + return nodes, nil +} + +func (provider *ConsulCatalog) watch(configurationChan chan<- types.ConfigMessage) error { + stopCh := make(chan struct{}) + serviceCatalog := provider.watchServices(stopCh) + + defer close(stopCh) + + for { + select { + case index, ok := <-serviceCatalog: + if !ok { + return errors.New("Consul service list nil") + } + log.Debug("List of services changed") + nodes, err := provider.getNodes(index) + if err != nil { + return err + } + configuration := provider.buildConfig(nodes) + configurationChan <- types.ConfigMessage{ + ProviderName: "consul_catalog", + Configuration: configuration, + } + } + } +} + +// Provide allows the provider to provide configurations to traefik +// using the given configuration channel. +func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMessage) error { + config := api.DefaultConfig() + config.Address = provider.Endpoint + client, err := api.NewClient(config) + if err != nil { + return err + } + provider.client = client + + go func() { + notify := func(err error, time time.Duration) { + log.Errorf("Consul connection error %+v, retrying in %s", err, time) + } + worker := func() error { + return provider.watch(configurationChan) + } + err := backoff.RetryNotify(worker, backoff.NewExponentialBackOff(), notify) + if err != nil { + log.Fatalf("Cannot connect to consul server %+v", err) + } + }() + + return err +} diff --git a/provider/consul_catalog_test.go b/provider/consul_catalog_test.go new file mode 100644 index 000000000..987f25c5b --- /dev/null +++ b/provider/consul_catalog_test.go @@ -0,0 +1,110 @@ +package provider + +import ( + "reflect" + "testing" + + "github.com/emilevauge/traefik/types" + "github.com/hashicorp/consul/api" +) + +func TestConsulCatalogGetFrontendValue(t *testing.T) { + provider := &ConsulCatalog{ + Domain: "localhost", + } + + services := []struct { + service string + expected string + }{ + { + service: "foo", + expected: "foo.localhost", + }, + } + + for _, e := range services { + actual := provider.getFrontendValue(e.service) + if actual != e.expected { + t.Fatalf("expected %q, got %q", e.expected, actual) + } + } +} + +func TestConsulCatalogBuildConfig(t *testing.T) { + provider := &ConsulCatalog{ + Domain: "localhost", + } + + cases := []struct { + nodes []catalogUpdate + expectedFrontends map[string]*types.Frontend + expectedBackends map[string]*types.Backend + }{ + { + nodes: []catalogUpdate{}, + expectedFrontends: map[string]*types.Frontend{}, + expectedBackends: map[string]*types.Backend{}, + }, + { + nodes: []catalogUpdate{ + { + Service: "test", + }, + }, + expectedFrontends: map[string]*types.Frontend{}, + expectedBackends: map[string]*types.Backend{}, + }, + { + nodes: []catalogUpdate{ + { + Service: "test", + Nodes: []*api.ServiceEntry{ + { + Service: &api.AgentService{ + Service: "test", + Port: 80, + }, + Node: &api.Node{ + Node: "localhost", + Address: "127.0.0.1", + }, + }, + }, + }, + }, + expectedFrontends: map[string]*types.Frontend{ + "frontend-test": { + Backend: "backend-test", + Routes: map[string]types.Route{ + "route-host-test": { + Rule: "Host", + Value: "test.localhost", + }, + }, + }, + }, + expectedBackends: map[string]*types.Backend{ + "backend-test": { + Servers: map[string]types.Server{ + "server-localhost-80": { + URL: "http://127.0.0.1:80", + }, + }, + CircuitBreaker: nil, + LoadBalancer: nil, + }, + }, + }, + } + + for _, c := range cases { + actualConfig := provider.buildConfig(c.nodes) + if !reflect.DeepEqual(actualConfig.Backends, c.expectedBackends) { + t.Fatalf("expected %#v, got %#v", c.expectedBackends, actualConfig.Backends) + } + if !reflect.DeepEqual(actualConfig.Frontends, c.expectedFrontends) { + t.Fatalf("expected %#v, got %#v", c.expectedFrontends, actualConfig.Frontends) + } + } +} diff --git a/server.go b/server.go index ef920f7c6..97b60d0a8 100644 --- a/server.go +++ b/server.go @@ -184,6 +184,9 @@ func (server *Server) configureProviders() { if server.globalConfiguration.Consul != nil { server.providers = append(server.providers, server.globalConfiguration.Consul) } + if server.globalConfiguration.ConsulCatalog != nil { + server.providers = append(server.providers, server.globalConfiguration.ConsulCatalog) + } if server.globalConfiguration.Etcd != nil { server.providers = append(server.providers, server.globalConfiguration.Etcd) } diff --git a/templates/consul_catalog.tmpl b/templates/consul_catalog.tmpl new file mode 100644 index 000000000..65a7a0d3c --- /dev/null +++ b/templates/consul_catalog.tmpl @@ -0,0 +1,13 @@ +[backends]{{range .Nodes}} + [backends.backend-{{getBackend .}}.servers.server-{{.Node.Node | replace "." "-"}}-{{.Service.Port}}] + url = "http://{{.Node.Address}}:{{.Service.Port}}" +{{end}} + +[frontends]{{range .Services}} + [frontends.frontend-{{.}}] + backend = "backend-{{.}}" + passHostHeader = false + [frontends.frontend-{{.}}.routes.route-host-{{.}}] + rule = "Host" + value = "{{getFrontendValue .}}" +{{end}} diff --git a/traefik.go b/traefik.go index ea5ef7d62..dec978a6f 100644 --- a/traefik.go +++ b/traefik.go @@ -8,7 +8,6 @@ import ( func main() { runtime.GOMAXPROCS(runtime.NumCPU()) - if err := traefikCmd.Execute(); err != nil { fmtlog.Println(err) os.Exit(-1)