From 8765494cbd4d1781ced51d54ee3c5e131110d556 Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 22 Aug 2017 11:46:03 +0200 Subject: [PATCH] Add support for several ECS backends --- cmd/traefik/traefik.go | 2 + docs/toml.md | 21 +++- provider/ecs/cluster.go | 32 ++++++ provider/ecs/cluster_test.go | 80 +++++++++++++ provider/ecs/ecs.go | 215 ++++++++++++++++++++--------------- server/configuration.go | 3 +- traefik.sample.toml | 10 +- 7 files changed, 268 insertions(+), 95 deletions(-) create mode 100644 provider/ecs/cluster.go create mode 100644 provider/ecs/cluster_test.go diff --git a/cmd/traefik/traefik.go b/cmd/traefik/traefik.go index ec51f697f..27f2a122a 100644 --- a/cmd/traefik/traefik.go +++ b/cmd/traefik/traefik.go @@ -19,6 +19,7 @@ import ( "github.com/containous/traefik/acme" "github.com/containous/traefik/cluster" "github.com/containous/traefik/log" + "github.com/containous/traefik/provider/ecs" "github.com/containous/traefik/provider/kubernetes" "github.com/containous/traefik/provider/rancher" "github.com/containous/traefik/safe" @@ -144,6 +145,7 @@ Complete documentation is available at https://traefik.io`, f.AddParser(reflect.TypeOf(server.RootCAs{}), &server.RootCAs{}) f.AddParser(reflect.TypeOf(types.Constraints{}), &types.Constraints{}) f.AddParser(reflect.TypeOf(kubernetes.Namespaces{}), &kubernetes.Namespaces{}) + f.AddParser(reflect.TypeOf(ecs.Clusters{}), &ecs.Clusters{}) f.AddParser(reflect.TypeOf([]acme.Domain{}), &acme.Domains{}) f.AddParser(reflect.TypeOf(types.Buckets{}), &types.Buckets{}) diff --git a/docs/toml.md b/docs/toml.md index a5215bf32..112a1f64a 100644 --- a/docs/toml.md +++ b/docs/toml.md @@ -1708,10 +1708,16 @@ Træfik can be configured to use Amazon ECS as a backend configuration: # ECS Cluster Name # -# Optional -# Default: "default" +# Deprecated - Please use Clusters # -Cluster = "default" +# Cluster = "default" + +# ECS Clusters Name +# +# Optional +# Default: ["default"] +# +Clusters = ["default"] # Enable watch ECS changes # @@ -1720,6 +1726,13 @@ Cluster = "default" # Watch = true +# Enable auto discover ECS clusters +# +# Optional +# Default: false +# +AutoDiscoverClusters = false + # Polling interval (in seconds) # # Optional @@ -1780,6 +1793,8 @@ Træfik needs the following policy to read ECS information: "Sid": "Traefik ECS read access", "Effect": "Allow", "Action": [ + "ecs:ListClusters", + "ecs:DescribeClusters", "ecs:ListTasks", "ecs:DescribeTasks", "ecs:DescribeContainerInstances", diff --git a/provider/ecs/cluster.go b/provider/ecs/cluster.go new file mode 100644 index 000000000..cb602b9be --- /dev/null +++ b/provider/ecs/cluster.go @@ -0,0 +1,32 @@ +package ecs + +import ( + "fmt" + "strings" +) + +// Clusters holds ecs clusters name +type Clusters []string + +// Set adds strings elem into the the parser +// it splits str on , and ; +func (c *Clusters) Set(str string) error { + fargs := func(c rune) bool { + return c == ',' || c == ';' + } + // get function + slice := strings.FieldsFunc(str, fargs) + *c = append(*c, slice...) + return nil +} + +// Get Clusters +func (c *Clusters) Get() interface{} { return Clusters(*c) } + +// String return slice in a string +func (c *Clusters) String() string { return fmt.Sprintf("%v", *c) } + +// SetValue sets Clusters into the parser +func (c *Clusters) SetValue(val interface{}) { + *c = Clusters(val.(Clusters)) +} diff --git a/provider/ecs/cluster_test.go b/provider/ecs/cluster_test.go new file mode 100644 index 000000000..355dfa9fd --- /dev/null +++ b/provider/ecs/cluster_test.go @@ -0,0 +1,80 @@ +package ecs + +import ( + "reflect" + "testing" +) + +func TestClustersSet(t *testing.T) { + checkMap := map[string]Clusters{ + "cluster": {"cluster"}, + "cluster1,cluster2": {"cluster1", "cluster2"}, + "cluster1;cluster2": {"cluster1", "cluster2"}, + "cluster1,cluster2;cluster3": {"cluster1", "cluster2", "cluster3"}, + } + for str, check := range checkMap { + var clusters Clusters + if err := clusters.Set(str); err != nil { + t.Fatalf("Error :%s", err) + } + if !reflect.DeepEqual(clusters, check) { + t.Fatalf("Expected:%s\ngot:%s", check, clusters) + } + } +} + +func TestClustersGet(t *testing.T) { + slices := []Clusters{ + {"cluster"}, + {"cluster1", "cluster2"}, + {"cluster1", "cluster2", "cluster3"}, + } + check := []Clusters{ + {"cluster"}, + {"cluster1", "cluster2"}, + {"cluster1", "cluster2", "cluster3"}, + } + for i, slice := range slices { + if !reflect.DeepEqual(slice.Get(), check[i]) { + t.Fatalf("Expected:%s\ngot:%s", check[i], slice) + } + } +} + +func TestClustersString(t *testing.T) { + slices := []Clusters{ + {"cluster"}, + {"cluster1", "cluster2"}, + {"cluster1", "cluster2", "cluster3"}, + } + check := []string{ + "[cluster]", + "[cluster1 cluster2]", + "[cluster1 cluster2 cluster3]", + } + for i, slice := range slices { + if !reflect.DeepEqual(slice.String(), check[i]) { + t.Fatalf("Expected:%s\ngot:%s", check[i], slice) + } + } +} + +func TestClustersSetValue(t *testing.T) { + check := []Clusters{ + {"cluster"}, + {"cluster1", "cluster2"}, + {"cluster1", "cluster2", "cluster3"}, + } + slices := []Clusters{ + {"cluster"}, + {"cluster1", "cluster2"}, + {"cluster1", "cluster2", "cluster3"}, + } + for i, s := range slices { + var slice Clusters + slice.SetValue(s) + if !reflect.DeepEqual(slice, check[i]) { + t.Fatalf("Expected:%s\ngot:%s", check[i], slice) + } + } +} diff --git a/provider/ecs/ecs.go b/provider/ecs/ecs.go index 836d1057a..b1e93ec82 100644 --- a/provider/ecs/ecs.go +++ b/provider/ecs/ecs.go @@ -36,10 +36,12 @@ type Provider struct { RefreshSeconds int `description:"Polling interval (in seconds)"` // Provider lookup parameters - Cluster string `description:"ECS Cluster Name"` - Region string `description:"The AWS region to use for requests"` - AccessKeyID string `description:"The AWS credentials access key to use for making requests"` - SecretAccessKey string `description:"The AWS credentials access key to use for making requests"` + Clusters Clusters `description:"ECS Clusters name"` + Cluster string `description:"deprecated - ECS Cluster name"` // deprecated + AutoDiscoverClusters bool `description:"Auto discover cluster"` + Region string `description:"The AWS region to use for requests"` + AccessKeyID string `description:"The AWS credentials access key to use for making requests"` + SecretAccessKey string `description:"The AWS credentials access key to use for making requests"` } type ecsInstance struct { @@ -200,103 +202,138 @@ func (p *Provider) loadECSConfig(ctx context.Context, client *awsClient) (*types // and the EC2 instance data func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsInstance, error) { var taskArns []*string - req, _ := client.ecs.ListTasksRequest(&ecs.ListTasksInput{ - Cluster: &p.Cluster, - DesiredStatus: aws.String(ecs.DesiredStatusRunning), - }) - - for ; req != nil; req = req.NextPage() { - if err := wrapAws(ctx, req); err != nil { - return nil, err - } - - taskArns = append(taskArns, req.Data.(*ecs.ListTasksOutput).TaskArns...) - } - - // Early return: if we can't list tasks we have nothing to - // describe below - likely empty cluster/permissions are bad. This - // stops the AWS API from returning a 401 when you DescribeTasks - // with no input. - if len(taskArns) == 0 { - return []ecsInstance{}, nil - } - - chunkedTaskArns := p.chunkedTaskArns(taskArns) - var tasks []*ecs.Task - - for _, arns := range chunkedTaskArns { - req, taskResp := client.ecs.DescribeTasksRequest(&ecs.DescribeTasksInput{ - Tasks: arns, - Cluster: &p.Cluster, - }) - - if err := wrapAws(ctx, req); err != nil { - return nil, err - } - tasks = append(tasks, taskResp.Tasks...) - - } - - containerInstanceArns := make([]*string, 0) - byContainerInstance := make(map[string]int) - - taskDefinitionArns := make([]*string, 0) - byTaskDefinition := make(map[string]int) - - for _, task := range tasks { - if _, found := byContainerInstance[*task.ContainerInstanceArn]; !found { - byContainerInstance[*task.ContainerInstanceArn] = len(containerInstanceArns) - containerInstanceArns = append(containerInstanceArns, task.ContainerInstanceArn) - } - if _, found := byTaskDefinition[*task.TaskDefinitionArn]; !found { - byTaskDefinition[*task.TaskDefinitionArn] = len(taskDefinitionArns) - taskDefinitionArns = append(taskDefinitionArns, task.TaskDefinitionArn) - } - } - - machines, err := p.lookupEc2Instances(ctx, client, containerInstanceArns) - if err != nil { - return nil, err - } - - taskDefinitions, err := p.lookupTaskDefinitions(ctx, client, taskDefinitionArns) - if err != nil { - return nil, err - } - var instances []ecsInstance - for _, task := range tasks { + var clustersArn []*string + var clusters Clusters - machineIdx := byContainerInstance[*task.ContainerInstanceArn] - taskDefIdx := byTaskDefinition[*task.TaskDefinitionArn] - - for _, container := range task.Containers { - - taskDefinition := taskDefinitions[taskDefIdx] - var containerDefinition *ecs.ContainerDefinition - for _, def := range taskDefinition.ContainerDefinitions { - if *container.Name == *def.Name { - containerDefinition = def + if p.AutoDiscoverClusters { + input := &ecs.ListClustersInput{} + for { + result, err := client.ecs.ListClusters(input) + if err != nil { + return nil, err + } + if result != nil { + clustersArn = append(clustersArn, result.ClusterArns...) + input.NextToken = result.NextToken + if result.NextToken == nil { break } + } else { + break + } + } + for _, carns := range clustersArn { + clusters = append(clusters, *carns) + } + } else if p.Cluster != "" { + // TODO: Deprecated configuration - Need to be removed in the future + clusters = Clusters{p.Cluster} + log.Warn("Deprecated configuration found: ecs.cluster " + + "Please use ecs.clusters instead.") + } else { + clusters = p.Clusters + } + log.Debugf("ECS Clusters: %s", clusters) + for _, c := range clusters { + + req, _ := client.ecs.ListTasksRequest(&ecs.ListTasksInput{ + Cluster: &c, + DesiredStatus: aws.String(ecs.DesiredStatusRunning), + }) + + for ; req != nil; req = req.NextPage() { + if err := wrapAws(ctx, req); err != nil { + return nil, err } - instances = append(instances, ecsInstance{ - fmt.Sprintf("%s-%s", strings.Replace(*task.Group, ":", "-", 1), *container.Name), - (*task.TaskArn)[len(*task.TaskArn)-12:], - task, - taskDefinition, - container, - containerDefinition, - machines[machineIdx], + taskArns = append(taskArns, req.Data.(*ecs.ListTasksOutput).TaskArns...) + } + + // Early return: if we can't list tasks we have nothing to + // describe below - likely empty cluster/permissions are bad. This + // stops the AWS API from returning a 401 when you DescribeTasks + // with no input. + if len(taskArns) == 0 { + return []ecsInstance{}, nil + } + + chunkedTaskArns := p.chunkedTaskArns(taskArns) + var tasks []*ecs.Task + + for _, arns := range chunkedTaskArns { + req, taskResp := client.ecs.DescribeTasksRequest(&ecs.DescribeTasksInput{ + Tasks: arns, + Cluster: &c, }) + + if err := wrapAws(ctx, req); err != nil { + return nil, err + } + tasks = append(tasks, taskResp.Tasks...) + + } + + containerInstanceArns := make([]*string, 0) + byContainerInstance := make(map[string]int) + + taskDefinitionArns := make([]*string, 0) + byTaskDefinition := make(map[string]int) + + for _, task := range tasks { + if _, found := byContainerInstance[*task.ContainerInstanceArn]; !found { + byContainerInstance[*task.ContainerInstanceArn] = len(containerInstanceArns) + containerInstanceArns = append(containerInstanceArns, task.ContainerInstanceArn) + } + if _, found := byTaskDefinition[*task.TaskDefinitionArn]; !found { + byTaskDefinition[*task.TaskDefinitionArn] = len(taskDefinitionArns) + taskDefinitionArns = append(taskDefinitionArns, task.TaskDefinitionArn) + } + } + + machines, err := p.lookupEc2Instances(ctx, client, &c, containerInstanceArns) + if err != nil { + return nil, err + } + + taskDefinitions, err := p.lookupTaskDefinitions(ctx, client, taskDefinitionArns) + if err != nil { + return nil, err + } + + for _, task := range tasks { + + machineIdx := byContainerInstance[*task.ContainerInstanceArn] + taskDefIdx := byTaskDefinition[*task.TaskDefinitionArn] + + for _, container := range task.Containers { + + taskDefinition := taskDefinitions[taskDefIdx] + var containerDefinition *ecs.ContainerDefinition + for _, def := range taskDefinition.ContainerDefinitions { + if *container.Name == *def.Name { + containerDefinition = def + break + } + } + + instances = append(instances, ecsInstance{ + fmt.Sprintf("%s-%s", strings.Replace(*task.Group, ":", "-", 1), *container.Name), + (*task.TaskArn)[len(*task.TaskArn)-12:], + task, + taskDefinition, + container, + containerDefinition, + machines[machineIdx], + }) + } } } return instances, nil } -func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, containerArns []*string) ([]*ec2.Instance, error) { +func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, clusterName *string, containerArns []*string) ([]*ec2.Instance, error) { order := make(map[string]int) instanceIds := make([]*string, len(containerArns)) @@ -307,7 +344,7 @@ func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, co req, _ := client.ecs.DescribeContainerInstancesRequest(&ecs.DescribeContainerInstancesInput{ ContainerInstances: containerArns, - Cluster: &p.Cluster, + Cluster: clusterName, }) for ; req != nil; req = req.NextPage() { diff --git a/server/configuration.go b/server/configuration.go index a85d8ab7b..c6cbd560a 100644 --- a/server/configuration.go +++ b/server/configuration.go @@ -534,8 +534,9 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration { var defaultECS ecs.Provider defaultECS.Watch = true defaultECS.ExposedByDefault = true + defaultECS.AutoDiscoverClusters = false + defaultECS.Clusters = ecs.Clusters{"default"} defaultECS.RefreshSeconds = 15 - defaultECS.Cluster = "default" defaultECS.Constraints = types.Constraints{} //default Rancher diff --git a/traefik.sample.toml b/traefik.sample.toml index 07d0c6b7e..c3560316f 100644 --- a/traefik.sample.toml +++ b/traefik.sample.toml @@ -1095,11 +1095,17 @@ # ECS Cluster Name # -# Optional -# Default: "default" +# Deprecated - Please use Clusters # # Cluster = "default" +# ECS Clusters Name +# +# Optional +# Default: ["default"] +# +# Clusters = ["default"] + # Enable watch ECS changes # # Optional