From 685c6dc00c74cf6bcf74237314eae9ea7213a06f Mon Sep 17 00:00:00 2001 From: Julien Salleyron Date: Fri, 13 Sep 2019 20:00:06 +0200 Subject: [PATCH] Add weighted round robin load balancer on TCP Co-authored-by: Mathieu Lonjaret --- docs/content/routing/services/index.md | 62 ++++++++- integration/fixtures/tcp/wrr.toml | 42 ++++++ integration/rest_test.go | 4 +- integration/tcp_test.go | 33 +++++ pkg/api/handler_overview_test.go | 6 +- pkg/api/handler_tcp_test.go | 28 ++-- pkg/api/handler_test.go | 2 +- pkg/config/dynamic/http_config.go | 2 +- pkg/config/dynamic/tcp_config.go | 34 ++++- pkg/config/dynamic/zz_generated.deepcopy.go | 103 ++++++++++---- pkg/config/label/label_test.go | 8 +- pkg/config/runtime/runtime_test.go | 8 +- pkg/provider/docker/config.go | 4 +- pkg/provider/docker/config_test.go | 12 +- .../fixtures/tcp/with_termination_delay.yml | 16 +++ .../crd/fixtures/tcp/with_two_services.yml | 3 +- .../kubernetes/crd/kubernetes_http.go | 2 + pkg/provider/kubernetes/crd/kubernetes_tcp.go | 99 ++++++++----- .../kubernetes/crd/kubernetes_test.go | 109 +++++++++++---- .../crd/traefik/v1alpha1/ingressroutetcp.go | 6 +- .../traefik/v1alpha1/zz_generated.deepcopy.go | 14 +- pkg/provider/marathon/config.go | 2 +- pkg/provider/marathon/config_test.go | 10 +- pkg/provider/rancher/config.go | 4 +- pkg/provider/rancher/config_test.go | 12 +- pkg/server/router/tcp/router_test.go | 8 +- pkg/server/service/service.go | 4 +- pkg/server/service/tcp/service.go | 65 ++++++--- pkg/server/service/tcp/service_test.go | 18 +-- pkg/tcp/proxy.go | 5 +- pkg/tcp/rr_load_balancer.go | 48 ------- pkg/tcp/wrr_load_balancer.go | 122 ++++++++++++++++ pkg/tcp/wrr_load_balancer_test.go | 131 ++++++++++++++++++ 33 files changed, 787 insertions(+), 239 deletions(-) create mode 100644 integration/fixtures/tcp/wrr.toml create mode 100644 pkg/provider/kubernetes/crd/fixtures/tcp/with_termination_delay.yml delete mode 100644 pkg/tcp/rr_load_balancer.go create mode 100644 pkg/tcp/wrr_load_balancer.go create mode 100644 pkg/tcp/wrr_load_balancer_test.go diff --git a/docs/content/routing/services/index.md b/docs/content/routing/services/index.md index 0e9e109df..fb71898eb 100644 --- a/docs/content/routing/services/index.md +++ b/docs/content/routing/services/index.md @@ -404,13 +404,14 @@ http: ### General -Currently, `LoadBalancer` is the only supported kind of TCP `Service`. -However, since Traefik is an ever evolving project, other kind of TCP Services will be available in the future, -reason why you have to specify it. +Each of the fields of the service section represents a kind of service. +Which means, that for each specified service, one of the fields, and only one, +has to be enabled to define what kind of service is created. +Currently, the two available kinds are `LoadBalancer`, and `Weighted`. -### Load Balancer +### Servers Load Balancer -The load balancers are able to load balance the requests between multiple instances of your programs. +The servers load balancer is in charge of balancing the requests between the servers of the same service. ??? example "Declaring a Service with Two Servers -- Using the [File Provider](../../providers/file.md)" @@ -486,3 +487,54 @@ A negative value means an infinite deadline (i.e. the connection is never fully loadBalancer: terminationDelay: 200 ``` + +### Weighted + +The Weighted Round Robin (alias `WRR`) load-balancer of services is in charge of balancing the requests between multiple services based on provided weights. + +This strategy is only available to load balance between [services](./index.md) and not between [servers](./index.md#servers). + +This strategy can only be defined with [File](../../providers/file.md). + +```toml tab="TOML" +[tcp.services] + [tcp.services.app] + [[tcp.services.app.weighted.services]] + name = "appv1" + weight = 3 + [[tcp.services.app.weighted.services]] + name = "appv2" + weight = 1 + + [tcp.services.appv1] + [tcp.services.appv1.loadBalancer] + [[tcp.services.appv1.loadBalancer.servers]] + address = "private-ip-server-1/:8080" + + [tcp.services.appv2] + [tcp.services.appv2.loadBalancer] + [[tcp.services.appv2.loadBalancer.servers]] + address = "private-ip-server-2/:8080" +``` + +```yaml tab="YAML" +tcp: + services: + app: + weighted: + services: + - name: appv1 + weight: 3 + - name: appv2 + weight: 1 + + appv1: + loadBalancer: + servers: + - address: "xxx.xxx.xxx.xxx:8080" + + appv2: + loadBalancer: + servers: + - address: "xxx.xxx.xxx.xxx:8080" +``` diff --git a/integration/fixtures/tcp/wrr.toml b/integration/fixtures/tcp/wrr.toml new file mode 100644 index 000000000..90cea1a00 --- /dev/null +++ b/integration/fixtures/tcp/wrr.toml @@ -0,0 +1,42 @@ +[global] + checkNewVersion = false + sendAnonymousUsage = false + +[log] + level = "DEBUG" + +[entryPoints] + [entryPoints.tcp] + address = ":8093" + +[api] + insecure = true + +[providers.file] + filename = "{{ .SelfFilename }}" + +## dynamic configuration ## +[tcp] + [tcp.routers] + [tcp.routers.to-whoami-a] + rule = "HostSNI(`whoami-a.test`)" + service = "whoami" + entryPoints = [ "tcp" ] + [tcp.routers.to-whoami-a.tls] + passthrough=true + + [[tcp.services.whoami.weighted.services]] + name="whoami-a" + weight=3 + [[tcp.services.whoami.weighted.services]] + name="whoami-b" + weight=1 + + [tcp.services.whoami-a.loadBalancer] + [[tcp.services.whoami-a.loadBalancer.servers]] + address = "localhost:8081" + + [tcp.services.whoami-b.loadBalancer] + [[tcp.services.whoami-b.loadBalancer.servers]] + address = "localhost:8082" + diff --git a/integration/rest_test.go b/integration/rest_test.go index d87092571..0b6fa731f 100644 --- a/integration/rest_test.go +++ b/integration/rest_test.go @@ -79,7 +79,7 @@ func (s *RestSuite) TestSimpleConfigurationInsecure(c *check.C) { }, Services: map[string]*dynamic.TCPService{ "service1": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: s.composeProject.Container(c, "whoami1").NetworkSettings.IPAddress + ":80", @@ -183,7 +183,7 @@ func (s *RestSuite) TestSimpleConfiguration(c *check.C) { }, Services: map[string]*dynamic.TCPService{ "service1": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: s.composeProject.Container(c, "whoami1").NetworkSettings.IPAddress + ":80", diff --git a/integration/tcp_test.go b/integration/tcp_test.go index c1fc0d57c..ba49b3fdb 100644 --- a/integration/tcp_test.go +++ b/integration/tcp_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "time" "github.com/containous/traefik/v2/integration/try" @@ -266,3 +267,35 @@ func guessWhoTLSMaxVersion(addr, serverName string, tlsCall bool, tlsMaxVersion return string(out[:n]), nil } + +func (s *TCPSuite) TestWRR(c *check.C) { + file := s.adaptFile(c, "fixtures/tcp/wrr.toml", struct{}{}) + defer os.Remove(file) + + cmd, display := s.traefikCmd(withConfigFile(file)) + defer display(c) + + err := cmd.Start() + c.Assert(err, checker.IsNil) + defer cmd.Process.Kill() + + err = try.GetRequest("http://127.0.0.1:8080/api/rawdata", 5*time.Second, try.StatusCodeIs(http.StatusOK), try.BodyContains("HostSNI")) + c.Assert(err, checker.IsNil) + + call := map[string]int{} + for i := 0; i < 4; i++ { + // Traefik passes through, termination handled by whoami-a + out, err := guessWho("127.0.0.1:8093", "whoami-a.test", true) + c.Assert(err, checker.IsNil) + switch { + case strings.Contains(out, "whoami-a"): + call["whoami-a"]++ + case strings.Contains(out, "whoami-b"): + call["whoami-b"]++ + default: + call["unknown"]++ + } + } + + c.Assert(call, checker.DeepEquals, map[string]int{"whoami-a": 3, "whoami-b": 1}) +} diff --git a/pkg/api/handler_overview_test.go b/pkg/api/handler_overview_test.go index e8eaa7c92..10ff7f693 100644 --- a/pkg/api/handler_overview_test.go +++ b/pkg/api/handler_overview_test.go @@ -136,7 +136,7 @@ func TestHandler_Overview(t *testing.T) { TCPServices: map[string]*runtime.TCPServiceInfo{ "tcpfoo-service@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1", @@ -148,7 +148,7 @@ func TestHandler_Overview(t *testing.T) { }, "tcpbar-service@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.2", @@ -160,7 +160,7 @@ func TestHandler_Overview(t *testing.T) { }, "tcpfii-service@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.2", diff --git a/pkg/api/handler_tcp_test.go b/pkg/api/handler_tcp_test.go index 798876de4..f033c0292 100644 --- a/pkg/api/handler_tcp_test.go +++ b/pkg/api/handler_tcp_test.go @@ -255,7 +255,7 @@ func TestHandler_TCP(t *testing.T) { TCPServices: map[string]*runtime.TCPServiceInfo{ "bar@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:2345", @@ -268,7 +268,7 @@ func TestHandler_TCP(t *testing.T) { }, "baz@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.2:2345", @@ -281,7 +281,7 @@ func TestHandler_TCP(t *testing.T) { }, "foz@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.2:2345", @@ -307,7 +307,7 @@ func TestHandler_TCP(t *testing.T) { TCPServices: map[string]*runtime.TCPServiceInfo{ "bar@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:2345", @@ -320,7 +320,7 @@ func TestHandler_TCP(t *testing.T) { }, "baz@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.2:2345", @@ -333,7 +333,7 @@ func TestHandler_TCP(t *testing.T) { }, "foz@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.2:2345", @@ -359,7 +359,7 @@ func TestHandler_TCP(t *testing.T) { TCPServices: map[string]*runtime.TCPServiceInfo{ "bar@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:2345", @@ -372,7 +372,7 @@ func TestHandler_TCP(t *testing.T) { }, "baz@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.2:2345", @@ -385,7 +385,7 @@ func TestHandler_TCP(t *testing.T) { }, "foz@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.2:2345", @@ -411,7 +411,7 @@ func TestHandler_TCP(t *testing.T) { TCPServices: map[string]*runtime.TCPServiceInfo{ "bar@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:2345", @@ -423,7 +423,7 @@ func TestHandler_TCP(t *testing.T) { }, "baz@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.2:2345", @@ -435,7 +435,7 @@ func TestHandler_TCP(t *testing.T) { }, "test@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.3:2345", @@ -459,7 +459,7 @@ func TestHandler_TCP(t *testing.T) { TCPServices: map[string]*runtime.TCPServiceInfo{ "bar@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:2345", @@ -483,7 +483,7 @@ func TestHandler_TCP(t *testing.T) { TCPServices: map[string]*runtime.TCPServiceInfo{ "bar@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:2345", diff --git a/pkg/api/handler_test.go b/pkg/api/handler_test.go index 7b506cd6c..3fe6d3649 100644 --- a/pkg/api/handler_test.go +++ b/pkg/api/handler_test.go @@ -91,7 +91,7 @@ func TestHandler_RawData(t *testing.T) { TCPServices: map[string]*runtime.TCPServiceInfo{ "tcpfoo-service@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1", diff --git a/pkg/config/dynamic/http_config.go b/pkg/config/dynamic/http_config.go index 7fc264018..ab56419bd 100644 --- a/pkg/config/dynamic/http_config.go +++ b/pkg/config/dynamic/http_config.go @@ -77,7 +77,7 @@ type WRRService struct { Weight *int `json:"weight,omitempty" toml:"weight,omitempty" yaml:"weight,omitempty"` } -// SetDefaults Default values for a ServersLoadBalancer. +// SetDefaults Default values for a WRRService. func (w *WRRService) SetDefaults() { defaultWeight := 1 w.Weight = &defaultWeight diff --git a/pkg/config/dynamic/tcp_config.go b/pkg/config/dynamic/tcp_config.go index d8ff19cb6..272da7b14 100644 --- a/pkg/config/dynamic/tcp_config.go +++ b/pkg/config/dynamic/tcp_config.go @@ -18,7 +18,29 @@ type TCPConfiguration struct { // TCPService holds a tcp service configuration (can only be of one type at the same time). type TCPService struct { - LoadBalancer *TCPLoadBalancerService `json:"loadBalancer,omitempty" toml:"loadBalancer,omitempty" yaml:"loadBalancer,omitempty"` + LoadBalancer *TCPServersLoadBalancer `json:"loadBalancer,omitempty" toml:"loadBalancer,omitempty" yaml:"loadBalancer,omitempty"` + Weighted *TCPWeightedRoundRobin `json:"weighted,omitempty" toml:"weighted,omitempty" yaml:"weighted,omitempty" label:"-"` +} + +// +k8s:deepcopy-gen=true + +// TCPWeightedRoundRobin is a weighted round robin tcp load-balancer of services. +type TCPWeightedRoundRobin struct { + Services []TCPWRRService `json:"services,omitempty" toml:"services,omitempty" yaml:"services,omitempty"` +} + +// +k8s:deepcopy-gen=true + +// TCPWRRService is a reference to a tcp service load-balanced with weighted round robin. +type TCPWRRService struct { + Name string `json:"name,omitempty" toml:"name,omitempty" yaml:"name,omitempty"` + Weight *int `json:"weight,omitempty" toml:"weight,omitempty" yaml:"weight,omitempty"` +} + +// SetDefaults Default values for a TCPWRRService. +func (w *TCPWRRService) SetDefaults() { + defaultWeight := 1 + w.Weight = &defaultWeight } // +k8s:deepcopy-gen=true @@ -43,8 +65,8 @@ type RouterTCPTLSConfig struct { // +k8s:deepcopy-gen=true -// TCPLoadBalancerService holds the LoadBalancerService configuration. -type TCPLoadBalancerService struct { +// TCPServersLoadBalancer holds the LoadBalancerService configuration. +type TCPServersLoadBalancer struct { // TerminationDelay, corresponds to the deadline that the proxy sets, after one // of its connected peers indicates it has closed the writing capability of its // connection, to close the reading capability as well, hence fully terminating the @@ -54,14 +76,14 @@ type TCPLoadBalancerService struct { Servers []TCPServer `json:"servers,omitempty" toml:"servers,omitempty" yaml:"servers,omitempty" label-slice-as-struct:"server"` } -// SetDefaults Default values for a TCPLoadBalancerService -func (l *TCPLoadBalancerService) SetDefaults() { +// SetDefaults Default values for a TCPServersLoadBalancer +func (l *TCPServersLoadBalancer) SetDefaults() { defaultTerminationDelay := 100 // in milliseconds l.TerminationDelay = &defaultTerminationDelay } // Mergeable tells if the given service is mergeable. -func (l *TCPLoadBalancerService) Mergeable(loadBalancer *TCPLoadBalancerService) bool { +func (l *TCPServersLoadBalancer) Mergeable(loadBalancer *TCPServersLoadBalancer) bool { savedServers := l.Servers defer func() { l.Servers = savedServers diff --git a/pkg/config/dynamic/zz_generated.deepcopy.go b/pkg/config/dynamic/zz_generated.deepcopy.go index 25d9d4807..36635949d 100644 --- a/pkg/config/dynamic/zz_generated.deepcopy.go +++ b/pkg/config/dynamic/zz_generated.deepcopy.go @@ -1152,32 +1152,6 @@ func (in *TCPConfiguration) DeepCopy() *TCPConfiguration { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *TCPLoadBalancerService) DeepCopyInto(out *TCPLoadBalancerService) { - *out = *in - if in.TerminationDelay != nil { - in, out := &in.TerminationDelay, &out.TerminationDelay - *out = new(int) - **out = **in - } - if in.Servers != nil { - in, out := &in.Servers, &out.Servers - *out = make([]TCPServer, len(*in)) - copy(*out, *in) - } - return -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TCPLoadBalancerService. -func (in *TCPLoadBalancerService) DeepCopy() *TCPLoadBalancerService { - if in == nil { - return nil - } - out := new(TCPLoadBalancerService) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TCPRouter) DeepCopyInto(out *TCPRouter) { *out = *in @@ -1220,12 +1194,43 @@ func (in *TCPServer) DeepCopy() *TCPServer { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TCPServersLoadBalancer) DeepCopyInto(out *TCPServersLoadBalancer) { + *out = *in + if in.TerminationDelay != nil { + in, out := &in.TerminationDelay, &out.TerminationDelay + *out = new(int) + **out = **in + } + if in.Servers != nil { + in, out := &in.Servers, &out.Servers + *out = make([]TCPServer, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TCPServersLoadBalancer. +func (in *TCPServersLoadBalancer) DeepCopy() *TCPServersLoadBalancer { + if in == nil { + return nil + } + out := new(TCPServersLoadBalancer) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TCPService) DeepCopyInto(out *TCPService) { *out = *in if in.LoadBalancer != nil { in, out := &in.LoadBalancer, &out.LoadBalancer - *out = new(TCPLoadBalancerService) + *out = new(TCPServersLoadBalancer) + (*in).DeepCopyInto(*out) + } + if in.Weighted != nil { + in, out := &in.Weighted, &out.Weighted + *out = new(TCPWeightedRoundRobin) (*in).DeepCopyInto(*out) } return @@ -1241,6 +1246,50 @@ func (in *TCPService) DeepCopy() *TCPService { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TCPWRRService) DeepCopyInto(out *TCPWRRService) { + *out = *in + if in.Weight != nil { + in, out := &in.Weight, &out.Weight + *out = new(int) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TCPWRRService. +func (in *TCPWRRService) DeepCopy() *TCPWRRService { + if in == nil { + return nil + } + out := new(TCPWRRService) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TCPWeightedRoundRobin) DeepCopyInto(out *TCPWeightedRoundRobin) { + *out = *in + if in.Services != nil { + in, out := &in.Services, &out.Services + *out = make([]TCPWRRService, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TCPWeightedRoundRobin. +func (in *TCPWeightedRoundRobin) DeepCopy() *TCPWeightedRoundRobin { + if in == nil { + return nil + } + out := new(TCPWeightedRoundRobin) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TLSCLientCertificateDNInfo) DeepCopyInto(out *TLSCLientCertificateDNInfo) { *out = *in diff --git a/pkg/config/label/label_test.go b/pkg/config/label/label_test.go index 989eb6be6..a88cb588d 100644 --- a/pkg/config/label/label_test.go +++ b/pkg/config/label/label_test.go @@ -208,7 +208,7 @@ func TestDecodeConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "Service0": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Port: "42", @@ -218,7 +218,7 @@ func TestDecodeConfiguration(t *testing.T) { }, }, "Service1": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Port: "42", @@ -614,7 +614,7 @@ func TestEncodeConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "Service0": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Port: "42", @@ -623,7 +623,7 @@ func TestEncodeConfiguration(t *testing.T) { }, }, "Service1": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Port: "42", diff --git a/pkg/config/runtime/runtime_test.go b/pkg/config/runtime/runtime_test.go index b2cbc6238..5ff0a97d6 100644 --- a/pkg/config/runtime/runtime_test.go +++ b/pkg/config/runtime/runtime_test.go @@ -488,7 +488,7 @@ func TestPopulateUsedBy(t *testing.T) { TCPServices: map[string]*runtime.TCPServiceInfo{ "foo-service@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1", @@ -522,7 +522,7 @@ func TestPopulateUsedBy(t *testing.T) { TCPServices: map[string]*runtime.TCPServiceInfo{ "foo-service@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1", @@ -598,7 +598,7 @@ func TestPopulateUsedBy(t *testing.T) { TCPServices: map[string]*runtime.TCPServiceInfo{ "foo-service@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1", @@ -614,7 +614,7 @@ func TestPopulateUsedBy(t *testing.T) { }, "bar-service@myprovider": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1", diff --git a/pkg/provider/docker/config.go b/pkg/provider/docker/config.go index 9c3c18bf0..b0aeb4c5f 100644 --- a/pkg/provider/docker/config.go +++ b/pkg/provider/docker/config.go @@ -78,7 +78,7 @@ func (p *Provider) buildTCPServiceConfiguration(ctx context.Context, container d if len(configuration.Services) == 0 { configuration.Services = make(map[string]*dynamic.TCPService) - lb := &dynamic.TCPLoadBalancerService{} + lb := &dynamic.TCPServersLoadBalancer{} lb.SetDefaults() configuration.Services[serviceName] = &dynamic.TCPService{ LoadBalancer: lb, @@ -145,7 +145,7 @@ func (p *Provider) keepContainer(ctx context.Context, container dockerData) bool return true } -func (p *Provider) addServerTCP(ctx context.Context, container dockerData, loadBalancer *dynamic.TCPLoadBalancerService) error { +func (p *Provider) addServerTCP(ctx context.Context, container dockerData, loadBalancer *dynamic.TCPServersLoadBalancer) error { serverPort := "" if loadBalancer != nil && len(loadBalancer.Servers) > 0 { serverPort = loadBalancer.Servers[0].Port diff --git a/pkg/provider/docker/config_test.go b/pkg/provider/docker/config_test.go index 2d927c0d7..dcaa20d4a 100644 --- a/pkg/provider/docker/config_test.go +++ b/pkg/provider/docker/config_test.go @@ -2088,7 +2088,7 @@ func Test_buildConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "Test": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:80", @@ -2133,7 +2133,7 @@ func Test_buildConfiguration(t *testing.T) { Routers: map[string]*dynamic.TCPRouter{}, Services: map[string]*dynamic.TCPService{ "Test": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:80", @@ -2188,7 +2188,7 @@ func Test_buildConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "foo": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:8080", @@ -2264,7 +2264,7 @@ func Test_buildConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "foo": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:8080", @@ -2331,7 +2331,7 @@ func Test_buildConfiguration(t *testing.T) { Routers: map[string]*dynamic.TCPRouter{}, Services: map[string]*dynamic.TCPService{ "foo": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:8080", @@ -2377,7 +2377,7 @@ func Test_buildConfiguration(t *testing.T) { Routers: map[string]*dynamic.TCPRouter{}, Services: map[string]*dynamic.TCPService{ "foo": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:8080", diff --git a/pkg/provider/kubernetes/crd/fixtures/tcp/with_termination_delay.yml b/pkg/provider/kubernetes/crd/fixtures/tcp/with_termination_delay.yml new file mode 100644 index 000000000..1d942ab6a --- /dev/null +++ b/pkg/provider/kubernetes/crd/fixtures/tcp/with_termination_delay.yml @@ -0,0 +1,16 @@ +apiVersion: traefik.containo.us/v1alpha1 +kind: IngressRouteTCP +metadata: + name: test.route + namespace: default + +spec: + entryPoints: + - foo + + routes: + - match: HostSNI(`foo.com`) + services: + - name: whoamitcp + port: 8000 + terminationDelay: 500 diff --git a/pkg/provider/kubernetes/crd/fixtures/tcp/with_two_services.yml b/pkg/provider/kubernetes/crd/fixtures/tcp/with_two_services.yml index bc846568e..73f60f037 100644 --- a/pkg/provider/kubernetes/crd/fixtures/tcp/with_two_services.yml +++ b/pkg/provider/kubernetes/crd/fixtures/tcp/with_two_services.yml @@ -13,6 +13,7 @@ spec: services: - name: whoamitcp port: 8000 + weight: 2 - name: whoamitcp2 port: 8080 - + weight: 3 diff --git a/pkg/provider/kubernetes/crd/kubernetes_http.go b/pkg/provider/kubernetes/crd/kubernetes_http.go index 5144d330f..b6835bce1 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_http.go +++ b/pkg/provider/kubernetes/crd/kubernetes_http.go @@ -73,6 +73,8 @@ func (p *Provider) loadIngressRouteConfiguration(ctx context.Context, client Cli continue } + // If there is only one service defined, we skip the creation of the load balancer of services, + // i.e. the service on top is directly a load balancer of servers. if len(route.Services) == 1 { conf.Services[serviceName] = balancerServerHTTP break diff --git a/pkg/provider/kubernetes/crd/kubernetes_tcp.go b/pkg/provider/kubernetes/crd/kubernetes_tcp.go index 33744a95b..a5a6b74e8 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_tcp.go +++ b/pkg/provider/kubernetes/crd/kubernetes_tcp.go @@ -49,9 +49,16 @@ func (p *Provider) loadIngressRouteTCPConfiguration(ctx context.Context, client continue } - var allServers []dynamic.TCPServer + key, err := makeServiceKey(route.Match, ingressName) + if err != nil { + logger.Error(err) + continue + } + + serviceName := makeID(ingressRouteTCP.Namespace, key) + for _, service := range route.Services { - servers, err := loadTCPServers(client, ingressRouteTCP.Namespace, service) + balancerServerTCP, err := createLoadBalancerServerTCP(client, ingressRouteTCP.Namespace, service) if err != nil { logger. WithField("serviceName", service.Name). @@ -60,16 +67,28 @@ func (p *Provider) loadIngressRouteTCPConfiguration(ctx context.Context, client continue } - allServers = append(allServers, servers...) + // If there is only one service defined, we skip the creation of the load balancer of services, + // i.e. the service on top is directly a load balancer of servers. + if len(route.Services) == 1 { + conf.Services[serviceName] = balancerServerTCP + break + } + + serviceKey := fmt.Sprintf("%s-%s-%d", serviceName, service.Name, service.Port) + conf.Services[serviceKey] = balancerServerTCP + + srv := dynamic.TCPWRRService{Name: serviceKey} + srv.SetDefaults() + if service.Weight != nil { + srv.Weight = service.Weight + } + + if conf.Services[serviceName] == nil { + conf.Services[serviceName] = &dynamic.TCPService{Weighted: &dynamic.TCPWeightedRoundRobin{}} + } + conf.Services[serviceName].Weighted.Services = append(conf.Services[serviceName].Weighted.Services, srv) } - key, e := makeServiceKey(route.Match, ingressName) - if e != nil { - logger.Error(e) - continue - } - - serviceName := makeID(ingressRouteTCP.Namespace, key) conf.Routers[serviceName] = &dynamic.TCPRouter{ EntryPoints: ingressRouteTCP.Spec.EntryPoints, Rule: route.Match, @@ -83,37 +102,53 @@ func (p *Provider) loadIngressRouteTCPConfiguration(ctx context.Context, client Domains: ingressRouteTCP.Spec.TLS.Domains, } - if ingressRouteTCP.Spec.TLS.Options != nil && len(ingressRouteTCP.Spec.TLS.Options.Name) > 0 { - tlsOptionsName := ingressRouteTCP.Spec.TLS.Options.Name - // Is a Kubernetes CRD reference (i.e. not a cross-provider reference) - ns := ingressRouteTCP.Spec.TLS.Options.Namespace - if !strings.Contains(tlsOptionsName, "@") { - if len(ns) == 0 { - ns = ingressRouteTCP.Namespace - } - tlsOptionsName = makeID(ns, tlsOptionsName) - } else if len(ns) > 0 { - logger. - WithField("TLSoptions", ingressRouteTCP.Spec.TLS.Options.Name). - Warnf("namespace %q is ignored in cross-provider context", ns) - } - - conf.Routers[serviceName].TLS.Options = tlsOptionsName - + if ingressRouteTCP.Spec.TLS.Options == nil || len(ingressRouteTCP.Spec.TLS.Options.Name) == 0 { + continue } + + tlsOptionsName := ingressRouteTCP.Spec.TLS.Options.Name + // Is a Kubernetes CRD reference (i.e. not a cross-provider reference) + ns := ingressRouteTCP.Spec.TLS.Options.Namespace + if !strings.Contains(tlsOptionsName, "@") { + if len(ns) == 0 { + ns = ingressRouteTCP.Namespace + } + tlsOptionsName = makeID(ns, tlsOptionsName) + } else if len(ns) > 0 { + logger. + WithField("TLSoptions", ingressRouteTCP.Spec.TLS.Options.Name). + Warnf("namespace %q is ignored in cross-provider context", ns) + } + + conf.Routers[serviceName].TLS.Options = tlsOptionsName + } - conf.Services[serviceName] = &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ - Servers: allServers, - }, - } } } return conf } +func createLoadBalancerServerTCP(client Client, namespace string, service v1alpha1.ServiceTCP) (*dynamic.TCPService, error) { + servers, err := loadTCPServers(client, namespace, service) + if err != nil { + return nil, err + } + + tcpService := &dynamic.TCPService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ + Servers: servers, + }, + } + + if service.TerminationDelay != nil { + tcpService.LoadBalancer.TerminationDelay = service.TerminationDelay + } + + return tcpService, nil +} + func loadTCPServers(client Client, namespace string, svc v1alpha1.ServiceTCP) ([]dynamic.TCPServer, error) { service, exists, err := client.GetService(namespace, svc.Name) if err != nil { diff --git a/pkg/provider/kubernetes/crd/kubernetes_test.go b/pkg/provider/kubernetes/crd/kubernetes_test.go index 03ceb3de2..dbdb7ef85 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_test.go +++ b/pkg/provider/kubernetes/crd/kubernetes_test.go @@ -55,7 +55,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "default/test.route-fdd3e9338e47a45efefc": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "10.10.0.1:8000", @@ -92,7 +92,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "default/test.route-fdd3e9338e47a45efefc": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "10.10.0.1:8000", @@ -106,7 +106,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, }, "default/test.route-f44ce589164e656d231c": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "10.10.0.1:8000", @@ -130,7 +130,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, }, { - desc: "One ingress Route with two different services, their servers will merge", + desc: "One ingress Route with two different services", paths: []string{"tcp/services.yml", "tcp/with_two_services.yml"}, expected: &dynamic.Configuration{ TCP: &dynamic.TCPConfiguration{ @@ -143,27 +143,44 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "default/test.route-fdd3e9338e47a45efefc": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ - Servers: []dynamic.TCPServer{ + Weighted: &dynamic.TCPWeightedRoundRobin{ + Services: []dynamic.TCPWRRService{ { - Address: "10.10.0.1:8000", - Port: "", + Name: "default/test.route-fdd3e9338e47a45efefc-whoamitcp-8000", + Weight: func(i int) *int { return &i }(2), }, { - Address: "10.10.0.2:8000", - Port: "", - }, - { - Address: "10.10.0.3:8080", - Port: "", - }, - { - Address: "10.10.0.4:8080", - Port: "", + Name: "default/test.route-fdd3e9338e47a45efefc-whoamitcp2-8080", + Weight: func(i int) *int { return &i }(3), }, }, }, - }}, + }, + "default/test.route-fdd3e9338e47a45efefc-whoamitcp-8000": { + LoadBalancer: &dynamic.TCPServersLoadBalancer{ + Servers: []dynamic.TCPServer{ + { + Address: "10.10.0.1:8000", + }, + { + Address: "10.10.0.2:8000", + }, + }, + }, + }, + "default/test.route-fdd3e9338e47a45efefc-whoamitcp2-8080": { + LoadBalancer: &dynamic.TCPServersLoadBalancer{ + Servers: []dynamic.TCPServer{ + { + Address: "10.10.0.3:8080", + }, + { + Address: "10.10.0.4:8080", + }, + }, + }, + }, + }, }, HTTP: &dynamic.HTTPConfiguration{ Routers: map[string]*dynamic.Router{}, @@ -247,7 +264,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "default/test.route-fdd3e9338e47a45efefc": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "10.10.0.1:8000", @@ -286,7 +303,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "default/test.route-fdd3e9338e47a45efefc": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "10.10.0.1:8000", @@ -345,7 +362,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "default/test.route-fdd3e9338e47a45efefc": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "10.10.0.1:8000", @@ -403,7 +420,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "default/test.route-fdd3e9338e47a45efefc": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "10.10.0.1:8000", @@ -460,7 +477,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "default/test.route-fdd3e9338e47a45efefc": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "10.10.0.1:8000", @@ -506,7 +523,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "default/test.route-fdd3e9338e47a45efefc": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "10.10.0.1:8000", @@ -552,7 +569,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "default/test.route-fdd3e9338e47a45efefc": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "10.10.0.1:8000", @@ -589,7 +606,7 @@ func TestLoadIngressRouteTCPs(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "default/test.route-fdd3e9338e47a45efefc": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "10.10.0.1:8000", @@ -612,6 +629,44 @@ func TestLoadIngressRouteTCPs(t *testing.T) { TLS: &dynamic.TLSConfiguration{}, }, }, + { + desc: "TCP with terminationDelay", + paths: []string{"tcp/services.yml", "tcp/with_termination_delay.yml"}, + expected: &dynamic.Configuration{ + TLS: &dynamic.TLSConfiguration{}, + TCP: &dynamic.TCPConfiguration{ + Routers: map[string]*dynamic.TCPRouter{ + "default/test.route-fdd3e9338e47a45efefc": { + EntryPoints: []string{"foo"}, + Service: "default/test.route-fdd3e9338e47a45efefc", + Rule: "HostSNI(`foo.com`)", + }, + }, + Services: map[string]*dynamic.TCPService{ + "default/test.route-fdd3e9338e47a45efefc": { + LoadBalancer: &dynamic.TCPServersLoadBalancer{ + Servers: []dynamic.TCPServer{ + { + Address: "10.10.0.1:8000", + Port: "", + }, + { + Address: "10.10.0.2:8000", + Port: "", + }, + }, + TerminationDelay: Int(500), + }, + }, + }, + }, + HTTP: &dynamic.HTTPConfiguration{ + Routers: map[string]*dynamic.Router{}, + Middlewares: map[string]*dynamic.Middleware{}, + Services: map[string]*dynamic.Service{}, + }, + }, + }, } for _, test := range testCases { diff --git a/pkg/provider/kubernetes/crd/traefik/v1alpha1/ingressroutetcp.go b/pkg/provider/kubernetes/crd/traefik/v1alpha1/ingressroutetcp.go index dde55c694..72c9a7d0e 100644 --- a/pkg/provider/kubernetes/crd/traefik/v1alpha1/ingressroutetcp.go +++ b/pkg/provider/kubernetes/crd/traefik/v1alpha1/ingressroutetcp.go @@ -44,8 +44,10 @@ type TLSOptionTCPRef struct { // ServiceTCP defines an upstream to proxy traffic. type ServiceTCP struct { - Name string `json:"name"` - Port int32 `json:"port"` + Name string `json:"name"` + Port int32 `json:"port"` + Weight *int `json:"weight,omitempty"` + TerminationDelay *int `json:"terminationDelay,omitempty"` } // +genclient diff --git a/pkg/provider/kubernetes/crd/traefik/v1alpha1/zz_generated.deepcopy.go b/pkg/provider/kubernetes/crd/traefik/v1alpha1/zz_generated.deepcopy.go index 85d073da4..3f6770cbe 100644 --- a/pkg/provider/kubernetes/crd/traefik/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/provider/kubernetes/crd/traefik/v1alpha1/zz_generated.deepcopy.go @@ -612,7 +612,9 @@ func (in *RouteTCP) DeepCopyInto(out *RouteTCP) { if in.Services != nil { in, out := &in.Services, &out.Services *out = make([]ServiceTCP, len(*in)) - copy(*out, *in) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } } return } @@ -666,6 +668,16 @@ func (in *Service) DeepCopy() *Service { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ServiceTCP) DeepCopyInto(out *ServiceTCP) { *out = *in + if in.Weight != nil { + in, out := &in.Weight, &out.Weight + *out = new(int) + **out = **in + } + if in.TerminationDelay != nil { + in, out := &in.TerminationDelay, &out.TerminationDelay + *out = new(int) + **out = **in + } return } diff --git a/pkg/provider/marathon/config.go b/pkg/provider/marathon/config.go index 08a767af4..e2f81c163 100644 --- a/pkg/provider/marathon/config.go +++ b/pkg/provider/marathon/config.go @@ -140,7 +140,7 @@ func (p *Provider) buildTCPServiceConfiguration(ctx context.Context, app maratho if len(conf.Services) == 0 { conf.Services = make(map[string]*dynamic.TCPService) - lb := &dynamic.TCPLoadBalancerService{} + lb := &dynamic.TCPServersLoadBalancer{} lb.SetDefaults() conf.Services[appName] = &dynamic.TCPService{ LoadBalancer: lb, diff --git a/pkg/provider/marathon/config_test.go b/pkg/provider/marathon/config_test.go index c2dc92b30..83c25736c 100644 --- a/pkg/provider/marathon/config_test.go +++ b/pkg/provider/marathon/config_test.go @@ -1236,7 +1236,7 @@ func TestBuildConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "app": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "localhost:80", @@ -1268,7 +1268,7 @@ func TestBuildConfiguration(t *testing.T) { Routers: map[string]*dynamic.TCPRouter{}, Services: map[string]*dynamic.TCPService{ "app": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "localhost:80", @@ -1308,7 +1308,7 @@ func TestBuildConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "foo": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "localhost:8080", @@ -1349,7 +1349,7 @@ func TestBuildConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "foo": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "localhost:8080", @@ -1390,7 +1390,7 @@ func TestBuildConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "foo": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "localhost:8080", diff --git a/pkg/provider/rancher/config.go b/pkg/provider/rancher/config.go index e2787d96d..b6bfa9ad8 100644 --- a/pkg/provider/rancher/config.go +++ b/pkg/provider/rancher/config.go @@ -74,7 +74,7 @@ func (p *Provider) buildTCPServiceConfiguration(ctx context.Context, service ran if len(configuration.Services) == 0 { configuration.Services = make(map[string]*dynamic.TCPService) - lb := &dynamic.TCPLoadBalancerService{} + lb := &dynamic.TCPServersLoadBalancer{} lb.SetDefaults() configuration.Services[serviceName] = &dynamic.TCPService{ LoadBalancer: lb, @@ -146,7 +146,7 @@ func (p *Provider) keepService(ctx context.Context, service rancherData) bool { return true } -func (p *Provider) addServerTCP(ctx context.Context, service rancherData, loadBalancer *dynamic.TCPLoadBalancerService) error { +func (p *Provider) addServerTCP(ctx context.Context, service rancherData, loadBalancer *dynamic.TCPServersLoadBalancer) error { log.FromContext(ctx).Debugf("Trying to add servers for service %s \n", service.Name) serverPort := "" diff --git a/pkg/provider/rancher/config_test.go b/pkg/provider/rancher/config_test.go index 14d172f35..0e771b190 100644 --- a/pkg/provider/rancher/config_test.go +++ b/pkg/provider/rancher/config_test.go @@ -508,7 +508,7 @@ func Test_buildConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "Test": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:80", @@ -545,7 +545,7 @@ func Test_buildConfiguration(t *testing.T) { Routers: map[string]*dynamic.TCPRouter{}, Services: map[string]*dynamic.TCPService{ "Test": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:80", @@ -588,7 +588,7 @@ func Test_buildConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "foo": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:8080", @@ -634,7 +634,7 @@ func Test_buildConfiguration(t *testing.T) { }, Services: map[string]*dynamic.TCPService{ "foo": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:8080", @@ -693,7 +693,7 @@ func Test_buildConfiguration(t *testing.T) { Routers: map[string]*dynamic.TCPRouter{}, Services: map[string]*dynamic.TCPService{ "foo": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:8080", @@ -731,7 +731,7 @@ func Test_buildConfiguration(t *testing.T) { Routers: map[string]*dynamic.TCPRouter{}, Services: map[string]*dynamic.TCPService{ "foo": { - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:8080", diff --git a/pkg/server/router/tcp/router_test.go b/pkg/server/router/tcp/router_test.go index d55aa98b0..935bbffe4 100644 --- a/pkg/server/router/tcp/router_test.go +++ b/pkg/server/router/tcp/router_test.go @@ -23,7 +23,7 @@ func TestRuntimeConfiguration(t *testing.T) { serviceConfig: map[string]*runtime.TCPServiceInfo{ "foo-service": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Port: "8085", @@ -70,7 +70,7 @@ func TestRuntimeConfiguration(t *testing.T) { serviceConfig: map[string]*runtime.TCPServiceInfo{ "foo-service": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:80", @@ -104,7 +104,7 @@ func TestRuntimeConfiguration(t *testing.T) { serviceConfig: map[string]*runtime.TCPServiceInfo{ "foo-service": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:80", @@ -137,7 +137,7 @@ func TestRuntimeConfiguration(t *testing.T) { serviceConfig: map[string]*runtime.TCPServiceInfo{ "foo-service": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "127.0.0.1:80", diff --git a/pkg/server/service/service.go b/pkg/server/service/service.go index bb030889a..b153f30ac 100644 --- a/pkg/server/service/service.go +++ b/pkg/server/service/service.go @@ -93,7 +93,9 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string, respons } } if count > 1 { - return nil, errors.New("cannot create service: multi-types service not supported, consider declaring two different pieces of service instead") + err := errors.New("cannot create service: multi-types service not supported, consider declaring two different pieces of service instead") + conf.AddError(err, true) + return nil, err } var lb http.Handler diff --git a/pkg/server/service/tcp/service.go b/pkg/server/service/tcp/service.go index 2495e2551..ca52ad72a 100644 --- a/pkg/server/service/tcp/service.go +++ b/pkg/server/service/tcp/service.go @@ -2,6 +2,7 @@ package tcp import ( "context" + "errors" "fmt" "net" "time" @@ -30,41 +31,59 @@ func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Han ctx := internal.AddProviderInContext(rootCtx, serviceQualifiedName) ctx = log.With(ctx, log.Str(log.ServiceName, serviceName)) - // FIXME Check if the service is declared multiple times with different types conf, ok := m.configs[serviceQualifiedName] if !ok { return nil, fmt.Errorf("the service %q does not exist", serviceQualifiedName) } - if conf.LoadBalancer == nil { - err := fmt.Errorf("the service %q doesn't have any TCP load balancer", serviceQualifiedName) + + if conf.LoadBalancer != nil && conf.Weighted != nil { + err := errors.New("cannot create service: multi-types service not supported, consider declaring two different pieces of service instead") conf.AddError(err, true) return nil, err } logger := log.FromContext(ctx) + switch { + case conf.LoadBalancer != nil: + loadBalancer := tcp.NewWRRLoadBalancer() - loadBalancer := tcp.NewRRLoadBalancer() - - if conf.LoadBalancer.TerminationDelay == nil { - defaultTerminationDelay := 100 - conf.LoadBalancer.TerminationDelay = &defaultTerminationDelay - } - duration := time.Millisecond * time.Duration(*conf.LoadBalancer.TerminationDelay) - - for name, server := range conf.LoadBalancer.Servers { - if _, _, err := net.SplitHostPort(server.Address); err != nil { - logger.Errorf("In service %q: %v", serviceQualifiedName, err) - continue + if conf.LoadBalancer.TerminationDelay == nil { + defaultTerminationDelay := 100 + conf.LoadBalancer.TerminationDelay = &defaultTerminationDelay } + duration := time.Millisecond * time.Duration(*conf.LoadBalancer.TerminationDelay) - handler, err := tcp.NewProxy(server.Address, duration) - if err != nil { - logger.Errorf("In service %q server %q: %v", serviceQualifiedName, server.Address, err) - continue + for name, server := range conf.LoadBalancer.Servers { + if _, _, err := net.SplitHostPort(server.Address); err != nil { + logger.Errorf("In service %q: %v", serviceQualifiedName, err) + continue + } + + handler, err := tcp.NewProxy(server.Address, duration) + if err != nil { + logger.Errorf("In service %q server %q: %v", serviceQualifiedName, server.Address, err) + continue + } + + loadBalancer.AddServer(handler) + logger.WithField(log.ServerName, name).Debugf("Creating TCP server %d at %s", name, server.Address) } - - loadBalancer.AddServer(handler) - logger.WithField(log.ServerName, name).Debugf("Creating TCP server %d at %s", name, server.Address) + return loadBalancer, nil + case conf.Weighted != nil: + loadBalancer := tcp.NewWRRLoadBalancer() + for _, service := range conf.Weighted.Services { + handler, err := m.BuildTCP(rootCtx, service.Name) + if err != nil { + logger.Errorf("In service %q: %v", serviceQualifiedName, err) + return nil, err + } + loadBalancer.AddWeightServer(handler, service.Weight) + } + return loadBalancer, nil + default: + err := fmt.Errorf("the service %q doesn't have any TCP load balancer", serviceQualifiedName) + conf.AddError(err, true) + return nil, err } - return loadBalancer, nil + } diff --git a/pkg/server/service/tcp/service_test.go b/pkg/server/service/tcp/service_test.go index 8d680f11f..495602f08 100644 --- a/pkg/server/service/tcp/service_test.go +++ b/pkg/server/service/tcp/service_test.go @@ -41,7 +41,7 @@ func TestManager_BuildTCP(t *testing.T) { configs: map[string]*runtime.TCPServiceInfo{ "test": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ {Address: "test:31"}, }, @@ -56,7 +56,7 @@ func TestManager_BuildTCP(t *testing.T) { configs: map[string]*runtime.TCPServiceInfo{ "test": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ {Address: "foobar"}, }, @@ -71,7 +71,7 @@ func TestManager_BuildTCP(t *testing.T) { configs: map[string]*runtime.TCPServiceInfo{ "serviceName": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{}, + LoadBalancer: &dynamic.TCPServersLoadBalancer{}, }, }, }, @@ -82,7 +82,7 @@ func TestManager_BuildTCP(t *testing.T) { configs: map[string]*runtime.TCPServiceInfo{ "serviceName@provider-1": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{}, + LoadBalancer: &dynamic.TCPServersLoadBalancer{}, }, }, }, @@ -93,7 +93,7 @@ func TestManager_BuildTCP(t *testing.T) { configs: map[string]*runtime.TCPServiceInfo{ "serviceName@provider-1": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{}, + LoadBalancer: &dynamic.TCPServersLoadBalancer{}, }, }, }, @@ -105,7 +105,7 @@ func TestManager_BuildTCP(t *testing.T) { configs: map[string]*runtime.TCPServiceInfo{ "serviceName@provider-1": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "foobar.com:80", @@ -123,7 +123,7 @@ func TestManager_BuildTCP(t *testing.T) { configs: map[string]*runtime.TCPServiceInfo{ "serviceName@provider-1": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "192.168.0.12:80", @@ -141,7 +141,7 @@ func TestManager_BuildTCP(t *testing.T) { configs: map[string]*runtime.TCPServiceInfo{ "serviceName@provider-1": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "foobar.com", @@ -159,7 +159,7 @@ func TestManager_BuildTCP(t *testing.T) { configs: map[string]*runtime.TCPServiceInfo{ "serviceName@provider-1": { TCPService: &dynamic.TCPService{ - LoadBalancer: &dynamic.TCPLoadBalancerService{ + LoadBalancer: &dynamic.TCPServersLoadBalancer{ Servers: []dynamic.TCPServer{ { Address: "192.168.0.12", diff --git a/pkg/tcp/proxy.go b/pkg/tcp/proxy.go index 2e7934415..0764fea5c 100644 --- a/pkg/tcp/proxy.go +++ b/pkg/tcp/proxy.go @@ -58,13 +58,14 @@ func (p Proxy) connCopy(dst, src WriteCloser, errCh chan error) { errClose := dst.CloseWrite() if errClose != nil { - log.WithoutContext().Errorf("Error while terminating connection: %v", errClose) + log.WithoutContext().Debugf("Error while terminating connection: %v", errClose) + return } if p.terminationDelay >= 0 { err := dst.SetReadDeadline(time.Now().Add(p.terminationDelay)) if err != nil { - log.WithoutContext().Errorf("Error while setting deadline: %v", err) + log.WithoutContext().Debugf("Error while setting deadline: %v", err) } } } diff --git a/pkg/tcp/rr_load_balancer.go b/pkg/tcp/rr_load_balancer.go deleted file mode 100644 index eb5773ae2..000000000 --- a/pkg/tcp/rr_load_balancer.go +++ /dev/null @@ -1,48 +0,0 @@ -package tcp - -import ( - "sync" - - "github.com/containous/traefik/v2/pkg/log" -) - -// RRLoadBalancer is a naive RoundRobin load balancer for TCP services -type RRLoadBalancer struct { - servers []Handler - lock sync.RWMutex - current int -} - -// NewRRLoadBalancer creates a new RRLoadBalancer -func NewRRLoadBalancer() *RRLoadBalancer { - return &RRLoadBalancer{} -} - -// ServeTCP forwards the connection to the right service -func (r *RRLoadBalancer) ServeTCP(conn WriteCloser) { - if len(r.servers) == 0 { - log.WithoutContext().Error("no available server") - return - } - - r.next().ServeTCP(conn) -} - -// AddServer appends a server to the existing list -func (r *RRLoadBalancer) AddServer(server Handler) { - r.servers = append(r.servers, server) -} - -func (r *RRLoadBalancer) next() Handler { - r.lock.Lock() - defer r.lock.Unlock() - - if r.current >= len(r.servers) { - r.current = 0 - log.WithoutContext().Debugf("Load balancer: going back to the first available server") - } - - handler := r.servers[r.current] - r.current++ - return handler -} diff --git a/pkg/tcp/wrr_load_balancer.go b/pkg/tcp/wrr_load_balancer.go new file mode 100644 index 000000000..bfa69d924 --- /dev/null +++ b/pkg/tcp/wrr_load_balancer.go @@ -0,0 +1,122 @@ +package tcp + +import ( + "fmt" + "sync" + + "github.com/containous/traefik/v2/pkg/log" +) + +type server struct { + Handler + weight int +} + +// WRRLoadBalancer is a naive RoundRobin load balancer for TCP services +type WRRLoadBalancer struct { + servers []server + lock sync.RWMutex + currentWeight int + index int +} + +// NewWRRLoadBalancer creates a new WRRLoadBalancer +func NewWRRLoadBalancer() *WRRLoadBalancer { + return &WRRLoadBalancer{ + index: -1, + } +} + +// ServeTCP forwards the connection to the right service +func (b *WRRLoadBalancer) ServeTCP(conn WriteCloser) { + if len(b.servers) == 0 { + log.WithoutContext().Error("no available server") + return + } + + next, err := b.next() + if err != nil { + log.WithoutContext().Errorf("Error during load balancing: %v", err) + conn.Close() + } + next.ServeTCP(conn) +} + +// AddServer appends a server to the existing list +func (b *WRRLoadBalancer) AddServer(serverHandler Handler) { + w := 1 + b.AddWeightServer(serverHandler, &w) +} + +// AddWeightServer appends a server to the existing list with a weight +func (b *WRRLoadBalancer) AddWeightServer(serverHandler Handler, weight *int) { + w := 1 + if weight != nil { + w = *weight + } + b.servers = append(b.servers, server{Handler: serverHandler, weight: w}) +} + +func (b *WRRLoadBalancer) maxWeight() int { + max := -1 + for _, s := range b.servers { + if s.weight > max { + max = s.weight + } + } + return max +} + +func (b *WRRLoadBalancer) weightGcd() int { + divisor := -1 + for _, s := range b.servers { + if divisor == -1 { + divisor = s.weight + } else { + divisor = gcd(divisor, s.weight) + } + } + return divisor +} + +func gcd(a, b int) int { + for b != 0 { + a, b = b, a%b + } + return a +} + +func (b *WRRLoadBalancer) next() (Handler, error) { + b.lock.Lock() + defer b.lock.Unlock() + + if len(b.servers) == 0 { + return nil, fmt.Errorf("no servers in the pool") + } + + // The algo below may look messy, but is actually very simple + // it calculates the GCD and subtracts it on every iteration, what interleaves servers + // and allows us not to build an iterator every time we readjust weights + + // GCD across all enabled servers + gcd := b.weightGcd() + // Maximum weight across all enabled servers + max := b.maxWeight() + + for { + b.index = (b.index + 1) % len(b.servers) + if b.index == 0 { + b.currentWeight -= gcd + if b.currentWeight <= 0 { + b.currentWeight = max + if b.currentWeight == 0 { + return nil, fmt.Errorf("all servers have 0 weight") + } + } + } + srv := b.servers[b.index] + if srv.weight >= b.currentWeight { + return srv, nil + } + } +} diff --git a/pkg/tcp/wrr_load_balancer_test.go b/pkg/tcp/wrr_load_balancer_test.go new file mode 100644 index 000000000..933f48976 --- /dev/null +++ b/pkg/tcp/wrr_load_balancer_test.go @@ -0,0 +1,131 @@ +package tcp + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fakeConn struct { + call map[string]int +} + +func (f *fakeConn) Read(b []byte) (n int, err error) { + panic("implement me") +} + +func (f *fakeConn) Write(b []byte) (n int, err error) { + f.call[string(b)]++ + return len(b), nil +} + +func (f *fakeConn) Close() error { + panic("implement me") +} + +func (f *fakeConn) LocalAddr() net.Addr { + panic("implement me") +} + +func (f *fakeConn) RemoteAddr() net.Addr { + panic("implement me") +} + +func (f *fakeConn) SetDeadline(t time.Time) error { + panic("implement me") +} + +func (f *fakeConn) SetReadDeadline(t time.Time) error { + panic("implement me") +} + +func (f *fakeConn) SetWriteDeadline(t time.Time) error { + panic("implement me") +} + +func (f *fakeConn) CloseWrite() error { + panic("implement me") +} + +func TestLoadBalancing(t *testing.T) { + testCases := []struct { + desc string + serversWeight map[string]int + totalCall int + expected map[string]int + }{ + { + desc: "RoundRobin", + serversWeight: map[string]int{ + "h1": 1, + "h2": 1, + }, + totalCall: 4, + expected: map[string]int{ + "h1": 2, + "h2": 2, + }, + }, + { + desc: "WeighedRoundRobin", + serversWeight: map[string]int{ + "h1": 3, + "h2": 1, + }, + totalCall: 4, + expected: map[string]int{ + "h1": 3, + "h2": 1, + }, + }, + { + desc: "WeighedRoundRobin with more call", + serversWeight: map[string]int{ + "h1": 3, + "h2": 1, + }, + totalCall: 16, + expected: map[string]int{ + "h1": 12, + "h2": 4, + }, + }, + { + desc: "WeighedRoundRobin with 0 weight server", + serversWeight: map[string]int{ + "h1": 3, + "h2": 0, + }, + totalCall: 16, + expected: map[string]int{ + "h1": 16, + }, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + balancer := NewWRRLoadBalancer() + for server, weight := range test.serversWeight { + server := server + balancer.AddWeightServer(HandlerFunc(func(conn WriteCloser) { + _, err := conn.Write([]byte(server)) + require.NoError(t, err) + }), &weight) + } + + conn := &fakeConn{call: make(map[string]int)} + for i := 0; i < test.totalCall; i++ { + balancer.ServeTCP(conn) + } + + assert.Equal(t, test.expected, conn.call) + }) + } +}