From 033fccccc7552de4c41aa906c86bf27d072aebe6 Mon Sep 17 00:00:00 2001 From: jjacque Date: Tue, 20 Sep 2022 16:54:08 +0200 Subject: [PATCH] Support gRPC healthcheck --- .../dynamic-configuration/docker-labels.yml | 1 + .../reference/dynamic-configuration/file.toml | 1 + .../reference/dynamic-configuration/file.yaml | 1 + .../reference/dynamic-configuration/kv-ref.md | 1 + .../marathon-labels.json | 1 + docs/content/routing/services/index.md | 4 +- pkg/config/dynamic/fixtures/sample.toml | 1 + pkg/config/dynamic/http_config.go | 2 + pkg/config/label/label_test.go | 4 + pkg/healthcheck/healthcheck.go | 77 ++++++- pkg/healthcheck/healthcheck_test.go | 170 ++++++--------- pkg/healthcheck/mock_test.go | 205 ++++++++++++++++++ pkg/provider/kv/kv_test.go | 2 + pkg/server/service/service.go | 11 + 14 files changed, 374 insertions(+), 107 deletions(-) create mode 100644 pkg/healthcheck/mock_test.go diff --git a/docs/content/reference/dynamic-configuration/docker-labels.yml b/docs/content/reference/dynamic-configuration/docker-labels.yml index c413f27bd..f4ddee380 100644 --- a/docs/content/reference/dynamic-configuration/docker-labels.yml +++ b/docs/content/reference/dynamic-configuration/docker-labels.yml @@ -154,6 +154,7 @@ - "traefik.http.services.service01.loadbalancer.healthcheck.method=foobar" - "traefik.http.services.service01.loadbalancer.healthcheck.port=42" - "traefik.http.services.service01.loadbalancer.healthcheck.scheme=foobar" +- "traefik.http.services.service01.loadbalancer.healthcheck.mode=foobar" - "traefik.http.services.service01.loadbalancer.healthcheck.timeout=foobar" - "traefik.http.services.service01.loadbalancer.passhostheader=true" - "traefik.http.services.service01.loadbalancer.responseforwarding.flushinterval=foobar" diff --git a/docs/content/reference/dynamic-configuration/file.toml b/docs/content/reference/dynamic-configuration/file.toml index 895a1f97f..4634afea2 100644 --- a/docs/content/reference/dynamic-configuration/file.toml +++ b/docs/content/reference/dynamic-configuration/file.toml @@ -53,6 +53,7 @@ url = "foobar" [http.services.Service01.loadBalancer.healthCheck] scheme = "foobar" + mode = "foobar" path = "foobar" method = "foobar" port = 42 diff --git a/docs/content/reference/dynamic-configuration/file.yaml b/docs/content/reference/dynamic-configuration/file.yaml index ec3081778..807bc8a14 100644 --- a/docs/content/reference/dynamic-configuration/file.yaml +++ b/docs/content/reference/dynamic-configuration/file.yaml @@ -58,6 +58,7 @@ http: - url: foobar healthCheck: scheme: foobar + mode: foobar path: foobar method: foobar port: 42 diff --git a/docs/content/reference/dynamic-configuration/kv-ref.md b/docs/content/reference/dynamic-configuration/kv-ref.md index c12162ccc..f72e024e4 100644 --- a/docs/content/reference/dynamic-configuration/kv-ref.md +++ b/docs/content/reference/dynamic-configuration/kv-ref.md @@ -208,6 +208,7 @@ | `traefik/http/services/Service01/loadBalancer/healthCheck/hostname` | `foobar` | | `traefik/http/services/Service01/loadBalancer/healthCheck/interval` | `foobar` | | `traefik/http/services/Service01/loadBalancer/healthCheck/method` | `foobar` | +| `traefik/http/services/Service01/loadBalancer/healthCheck/mode` | `foobar` | | `traefik/http/services/Service01/loadBalancer/healthCheck/path` | `foobar` | | `traefik/http/services/Service01/loadBalancer/healthCheck/port` | `42` | | `traefik/http/services/Service01/loadBalancer/healthCheck/scheme` | `foobar` | diff --git a/docs/content/reference/dynamic-configuration/marathon-labels.json b/docs/content/reference/dynamic-configuration/marathon-labels.json index 5da004819..7966f27f7 100644 --- a/docs/content/reference/dynamic-configuration/marathon-labels.json +++ b/docs/content/reference/dynamic-configuration/marathon-labels.json @@ -154,6 +154,7 @@ "traefik.http.services.service01.loadbalancer.healthcheck.method": "foobar", "traefik.http.services.service01.loadbalancer.healthcheck.port": "42", "traefik.http.services.service01.loadbalancer.healthcheck.scheme": "foobar", +"traefik.http.services.service01.loadbalancer.healthcheck.mode": "foobar", "traefik.http.services.service01.loadbalancer.healthcheck.timeout": "foobar", "traefik.http.services.service01.loadbalancer.passhostheader": "true", "traefik.http.services.service01.loadbalancer.responseforwarding.flushinterval": "foobar", diff --git a/docs/content/routing/services/index.md b/docs/content/routing/services/index.md index f7b1e8a59..7186765f1 100644 --- a/docs/content/routing/services/index.md +++ b/docs/content/routing/services/index.md @@ -316,7 +316,8 @@ On subsequent requests, to keep the session alive with the same server, the clie #### Health Check Configure health check to remove unhealthy servers from the load balancing rotation. -Traefik will consider your servers healthy as long as they return status codes between `2XX` and `3XX` to the health check requests (carried out every `interval`). +Traefik will consider your HTTP(s) servers healthy as long as they return status codes between `2XX` and `3XX` to the health check requests (carried out every `interval`). +For gRPC servers, Traefik will consider them healthy as long as they return `SERVING` to [gRPC health check v1](https://github.com/grpc/grpc/blob/master/doc/health-checking.md) requests. To propagate status changes (e.g. all servers of this service are down) upwards, HealthCheck must also be enabled on the parent(s) of this service. @@ -324,6 +325,7 @@ Below are the available options for the health check mechanism: - `path` (required), defines the server URL path for the health check endpoint . - `scheme` (optional), replaces the server URL `scheme` for the health check endpoint. +- `mode` (default: http), if defined to `grpc`, will use the gRPC health check protocol to probe the server. - `hostname` (optional), sets the value of `hostname` in the `Host` header of the health check request. - `port` (optional), replaces the server URL `port` for the health check endpoint. - `interval` (default: 30s), defines the frequency of the health check calls. diff --git a/pkg/config/dynamic/fixtures/sample.toml b/pkg/config/dynamic/fixtures/sample.toml index fdde014c7..11e586ea0 100644 --- a/pkg/config/dynamic/fixtures/sample.toml +++ b/pkg/config/dynamic/fixtures/sample.toml @@ -422,6 +422,7 @@ url = "foobar" [http.services.Service0.loadBalancer.healthCheck] scheme = "foobar" + mode = "foobar" path = "foobar" port = 42 interval = "foobar" diff --git a/pkg/config/dynamic/http_config.go b/pkg/config/dynamic/http_config.go index 0fa03feba..376223e1c 100644 --- a/pkg/config/dynamic/http_config.go +++ b/pkg/config/dynamic/http_config.go @@ -213,6 +213,7 @@ func (s *Server) SetDefaults() { // ServerHealthCheck holds the HealthCheck configuration. type ServerHealthCheck struct { Scheme string `json:"scheme,omitempty" toml:"scheme,omitempty" yaml:"scheme,omitempty" export:"true"` + Mode string `json:"mode,omitempty" toml:"mode,omitempty" yaml:"mode,omitempty" export:"true"` Path string `json:"path,omitempty" toml:"path,omitempty" yaml:"path,omitempty" export:"true"` Method string `json:"method,omitempty" toml:"method,omitempty" yaml:"method,omitempty" export:"true"` Port int `json:"port,omitempty" toml:"port,omitempty,omitzero" yaml:"port,omitempty" export:"true"` @@ -229,6 +230,7 @@ type ServerHealthCheck struct { func (h *ServerHealthCheck) SetDefaults() { fr := true h.FollowRedirects = &fr + h.Mode = "http" } // +k8s:deepcopy-gen=true diff --git a/pkg/config/label/label_test.go b/pkg/config/label/label_test.go index 04bce464e..d9b40ffb8 100644 --- a/pkg/config/label/label_test.go +++ b/pkg/config/label/label_test.go @@ -153,6 +153,7 @@ func TestDecodeConfiguration(t *testing.T) { "traefik.http.services.Service0.loadbalancer.healthcheck.method": "foobar", "traefik.http.services.Service0.loadbalancer.healthcheck.port": "42", "traefik.http.services.Service0.loadbalancer.healthcheck.scheme": "foobar", + "traefik.http.services.Service0.loadbalancer.healthcheck.mode": "foobar", "traefik.http.services.Service0.loadbalancer.healthcheck.timeout": "foobar", "traefik.http.services.Service0.loadbalancer.healthcheck.followredirects": "true", "traefik.http.services.Service0.loadbalancer.passhostheader": "true", @@ -169,6 +170,7 @@ func TestDecodeConfiguration(t *testing.T) { "traefik.http.services.Service1.loadbalancer.healthcheck.method": "foobar", "traefik.http.services.Service1.loadbalancer.healthcheck.port": "42", "traefik.http.services.Service1.loadbalancer.healthcheck.scheme": "foobar", + "traefik.http.services.Service1.loadbalancer.healthcheck.mode": "foobar", "traefik.http.services.Service1.loadbalancer.healthcheck.timeout": "foobar", "traefik.http.services.Service1.loadbalancer.healthcheck.followredirects": "true", "traefik.http.services.Service1.loadbalancer.passhostheader": "true", @@ -650,6 +652,7 @@ func TestDecodeConfiguration(t *testing.T) { }, HealthCheck: &dynamic.ServerHealthCheck{ Scheme: "foobar", + Mode: "foobar", Path: "foobar", Method: "foobar", Port: 42, @@ -678,6 +681,7 @@ func TestDecodeConfiguration(t *testing.T) { }, HealthCheck: &dynamic.ServerHealthCheck{ Scheme: "foobar", + Mode: "foobar", Path: "foobar", Method: "foobar", Port: 42, diff --git a/pkg/healthcheck/healthcheck.go b/pkg/healthcheck/healthcheck.go index 77a7743bb..36178ce98 100644 --- a/pkg/healthcheck/healthcheck.go +++ b/pkg/healthcheck/healthcheck.go @@ -19,6 +19,10 @@ import ( "github.com/traefik/traefik/v2/pkg/metrics" "github.com/traefik/traefik/v2/pkg/safe" "github.com/vulcand/oxy/roundrobin" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" ) const ( @@ -26,6 +30,11 @@ const ( serverDown = "DOWN" ) +const ( + HTTPMode = "http" + GRPCMode = "grpc" +) + var ( singleton *HealthCheck once sync.Once @@ -60,6 +69,7 @@ type Options struct { Headers map[string]string Hostname string Scheme string + Mode string Path string Method string Port int @@ -245,9 +255,18 @@ func NewBackendConfig(options Options, backendName string) *BackendConfig { } } -// checkHealth returns a nil error in case it was successful and otherwise -// a non-nil error with a meaningful description why the health check failed. +// checkHealth calls the proper health check function depending on the +// backend config mode, defaults to HTTP. func checkHealth(serverURL *url.URL, backend *BackendConfig) error { + if backend.Options.Mode == GRPCMode { + return checkHealthGRPC(serverURL, backend) + } + return checkHealthHTTP(serverURL, backend) +} + +// checkHealthHTTP returns an error with a meaningful description if the health check failed. +// Dedicated to HTTP servers. +func checkHealthHTTP(serverURL *url.URL, backend *BackendConfig) error { req, err := backend.newRequest(serverURL) if err != nil { return fmt.Errorf("failed to create HTTP request: %w", err) @@ -280,6 +299,60 @@ func checkHealth(serverURL *url.URL, backend *BackendConfig) error { return nil } +// checkHealthGRPC returns an error with a meaningful description if the health check failed. +// Dedicated to gRPC servers implementing gRPC Health Checking Protocol v1. +func checkHealthGRPC(serverURL *url.URL, backend *BackendConfig) error { + u, err := serverURL.Parse(backend.Path) + if err != nil { + return fmt.Errorf("failed to parse server URL: %w", err) + } + + port := u.Port() + if backend.Options.Port != 0 { + port = strconv.Itoa(backend.Options.Port) + } + + serverAddr := net.JoinHostPort(u.Hostname(), port) + + var opts []grpc.DialOption + switch backend.Options.Scheme { + case "http", "h2c", "": + opts = append(opts, grpc.WithInsecure()) + } + + ctx, cancel := context.WithTimeout(context.Background(), backend.Options.Timeout) + defer cancel() + + conn, err := grpc.DialContext(ctx, serverAddr, opts...) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return fmt.Errorf("fail to connect to %s within %s: %w", serverAddr, backend.Options.Timeout, err) + } + return fmt.Errorf("fail to connect to %s: %w", serverAddr, err) + } + defer func() { _ = conn.Close() }() + + resp, err := healthpb.NewHealthClient(conn).Check(ctx, &healthpb.HealthCheckRequest{}) + if err != nil { + if stat, ok := status.FromError(err); ok { + switch stat.Code() { + case codes.Unimplemented: + return fmt.Errorf("gRPC server does not implement the health protocol: %w", err) + case codes.DeadlineExceeded: + return fmt.Errorf("gRPC health check timeout: %w", err) + } + } + + return fmt.Errorf("gRPC health check failed: %w", err) + } + + if resp.Status != healthpb.HealthCheckResponse_SERVING { + return fmt.Errorf("received gRPC status code: %v", resp.Status) + } + + return nil +} + // StatusUpdater should be implemented by a service that, when its status // changes (e.g. all if its children are down), needs to propagate upwards (to // their parent(s)) that change. diff --git a/pkg/healthcheck/healthcheck_test.go b/pkg/healthcheck/healthcheck_test.go index 914f48291..43941033a 100644 --- a/pkg/healthcheck/healthcheck_test.go +++ b/pkg/healthcheck/healthcheck_test.go @@ -14,6 +14,7 @@ import ( "github.com/traefik/traefik/v2/pkg/config/runtime" "github.com/traefik/traefik/v2/pkg/testhelpers" "github.com/vulcand/oxy/roundrobin" + healthpb "google.golang.org/grpc/health/grpc_health_v1" ) const ( @@ -21,16 +22,12 @@ const ( healthCheckTimeout = 100 * time.Millisecond ) -type testHandler struct { - done func() - healthSequence []int -} - func TestSetBackendsConfiguration(t *testing.T) { testCases := []struct { desc string startHealthy bool - healthSequence []int + mode string + server StartTestServer expectedNumRemovedServers int expectedNumUpsertedServers int expectedGaugeValue float64 @@ -38,7 +35,7 @@ func TestSetBackendsConfiguration(t *testing.T) { { desc: "healthy server staying healthy", startHealthy: true, - healthSequence: []int{http.StatusOK}, + server: newHTTPServer(http.StatusOK), expectedNumRemovedServers: 0, expectedNumUpsertedServers: 0, expectedGaugeValue: 1, @@ -46,7 +43,7 @@ func TestSetBackendsConfiguration(t *testing.T) { { desc: "healthy server staying healthy (StatusNoContent)", startHealthy: true, - healthSequence: []int{http.StatusNoContent}, + server: newHTTPServer(http.StatusNoContent), expectedNumRemovedServers: 0, expectedNumUpsertedServers: 0, expectedGaugeValue: 1, @@ -54,7 +51,7 @@ func TestSetBackendsConfiguration(t *testing.T) { { desc: "healthy server staying healthy (StatusPermanentRedirect)", startHealthy: true, - healthSequence: []int{http.StatusPermanentRedirect}, + server: newHTTPServer(http.StatusPermanentRedirect), expectedNumRemovedServers: 0, expectedNumUpsertedServers: 0, expectedGaugeValue: 1, @@ -62,7 +59,7 @@ func TestSetBackendsConfiguration(t *testing.T) { { desc: "healthy server becoming sick", startHealthy: true, - healthSequence: []int{http.StatusServiceUnavailable}, + server: newHTTPServer(http.StatusServiceUnavailable), expectedNumRemovedServers: 1, expectedNumUpsertedServers: 0, expectedGaugeValue: 0, @@ -70,7 +67,7 @@ func TestSetBackendsConfiguration(t *testing.T) { { desc: "sick server becoming healthy", startHealthy: false, - healthSequence: []int{http.StatusOK}, + server: newHTTPServer(http.StatusOK), expectedNumRemovedServers: 0, expectedNumUpsertedServers: 1, expectedGaugeValue: 1, @@ -78,7 +75,7 @@ func TestSetBackendsConfiguration(t *testing.T) { { desc: "sick server staying sick", startHealthy: false, - healthSequence: []int{http.StatusServiceUnavailable}, + server: newHTTPServer(http.StatusServiceUnavailable), expectedNumRemovedServers: 0, expectedNumUpsertedServers: 0, expectedGaugeValue: 0, @@ -86,7 +83,52 @@ func TestSetBackendsConfiguration(t *testing.T) { { desc: "healthy server toggling to sick and back to healthy", startHealthy: true, - healthSequence: []int{http.StatusServiceUnavailable, http.StatusOK}, + server: newHTTPServer(http.StatusServiceUnavailable, http.StatusOK), + expectedNumRemovedServers: 1, + expectedNumUpsertedServers: 1, + expectedGaugeValue: 1, + }, + { + desc: "healthy grpc server staying healthy", + mode: "grpc", + startHealthy: true, + server: newGRPCServer(healthpb.HealthCheckResponse_SERVING), + expectedNumRemovedServers: 0, + expectedNumUpsertedServers: 0, + expectedGaugeValue: 1, + }, + { + desc: "healthy grpc server becoming sick", + mode: "grpc", + startHealthy: true, + server: newGRPCServer(healthpb.HealthCheckResponse_NOT_SERVING), + expectedNumRemovedServers: 1, + expectedNumUpsertedServers: 0, + expectedGaugeValue: 0, + }, + { + desc: "sick grpc server becoming healthy", + mode: "grpc", + startHealthy: false, + server: newGRPCServer(healthpb.HealthCheckResponse_SERVING), + expectedNumRemovedServers: 0, + expectedNumUpsertedServers: 1, + expectedGaugeValue: 1, + }, + { + desc: "sick grpc server staying sick", + mode: "grpc", + startHealthy: false, + server: newGRPCServer(healthpb.HealthCheckResponse_NOT_SERVING), + expectedNumRemovedServers: 0, + expectedNumUpsertedServers: 0, + expectedGaugeValue: 0, + }, + { + desc: "healthy grpc server toggling to sick and back to healthy", + mode: "grpc", + startHealthy: true, + server: newGRPCServer(healthpb.HealthCheckResponse_NOT_SERVING, healthpb.HealthCheckResponse_SERVING), expectedNumRemovedServers: 1, expectedNumUpsertedServers: 1, expectedGaugeValue: 1, @@ -98,22 +140,24 @@ func TestSetBackendsConfiguration(t *testing.T) { t.Run(test.desc, func(t *testing.T) { t.Parallel() - // The context is passed to the health check and canonically canceled by - // the test server once all expected requests have been received. + // The context is passed to the health check and + // canonically canceled by the test server once all expected requests have been received. ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - ts := newTestServer(cancel, test.healthSequence) - defer ts.Close() + t.Cleanup(cancel) + + serverURL, timeout := test.server.Start(t, cancel) lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}} - backend := NewBackendConfig(Options{ + + options := Options{ + Mode: test.mode, Path: "/path", Interval: healthCheckInterval, Timeout: healthCheckTimeout, LB: lb, - }, "backendName") + } + backend := NewBackendConfig(options, "backendName") - serverURL := testhelpers.MustParseURL(ts.URL) if test.startHealthy { lb.servers = append(lb.servers, serverURL) } else { @@ -121,6 +165,7 @@ func TestSetBackendsConfiguration(t *testing.T) { } collectingMetrics := &testhelpers.CollectingGauge{} + check := HealthCheck{ Backends: make(map[string]*BackendConfig), metrics: metricsHealthcheck{serverUpGauge: collectingMetrics}, @@ -134,9 +179,6 @@ func TestSetBackendsConfiguration(t *testing.T) { wg.Done() }() - // Make test timeout dependent on number of expected requests, health - // check interval, and a safety margin. - timeout := time.Duration(len(test.healthSequence)*int(healthCheckInterval) + 500) select { case <-time.After(timeout): t.Fatal("test did not complete in time") @@ -453,86 +495,6 @@ func TestBalancers_RemoveServer(t *testing.T) { assert.Equal(t, 0, len(balancer2.Servers())) } -type testLoadBalancer struct { - // RWMutex needed due to parallel test execution: Both the system-under-test - // and the test assertions reference the counters. - *sync.RWMutex - numRemovedServers int - numUpsertedServers int - servers []*url.URL - // options is just to make sure that LBStatusUpdater forwards options on Upsert to its BalancerHandler - options []roundrobin.ServerOption -} - -func (lb *testLoadBalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - // noop -} - -func (lb *testLoadBalancer) RemoveServer(u *url.URL) error { - lb.Lock() - defer lb.Unlock() - lb.numRemovedServers++ - lb.removeServer(u) - return nil -} - -func (lb *testLoadBalancer) UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error { - lb.Lock() - defer lb.Unlock() - lb.numUpsertedServers++ - lb.servers = append(lb.servers, u) - lb.options = append(lb.options, options...) - return nil -} - -func (lb *testLoadBalancer) Servers() []*url.URL { - return lb.servers -} - -func (lb *testLoadBalancer) Options() []roundrobin.ServerOption { - return lb.options -} - -func (lb *testLoadBalancer) removeServer(u *url.URL) { - var i int - var serverURL *url.URL - found := false - for i, serverURL = range lb.servers { - if *serverURL == *u { - found = true - break - } - } - if !found { - return - } - - lb.servers = append(lb.servers[:i], lb.servers[i+1:]...) -} - -func newTestServer(done func(), healthSequence []int) *httptest.Server { - handler := &testHandler{ - done: done, - healthSequence: healthSequence, - } - return httptest.NewServer(handler) -} - -// ServeHTTP returns HTTP response codes following a status sequences. -// It calls the given 'done' function once all request health indicators have been depleted. -func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if len(th.healthSequence) == 0 { - panic("received unexpected request") - } - - w.WriteHeader(th.healthSequence[0]) - - th.healthSequence = th.healthSequence[1:] - if len(th.healthSequence) == 0 { - th.done() - } -} - func TestLBStatusUpdater(t *testing.T) { lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}} svInfo := &runtime.ServiceInfo{} diff --git a/pkg/healthcheck/mock_test.go b/pkg/healthcheck/mock_test.go new file mode 100644 index 000000000..19e60b15c --- /dev/null +++ b/pkg/healthcheck/mock_test.go @@ -0,0 +1,205 @@ +package healthcheck + +import ( + "context" + "net" + "net/http" + "net/http/httptest" + "net/url" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/traefik/traefik/v2/pkg/testhelpers" + "github.com/vulcand/oxy/roundrobin" + "google.golang.org/grpc" + healthpb "google.golang.org/grpc/health/grpc_health_v1" +) + +type StartTestServer interface { + Start(t *testing.T, done func()) (*url.URL, time.Duration) +} + +type Status interface { + ~int | ~int32 +} + +type HealthSequence[T Status] struct { + sequenceMu sync.Mutex + sequence []T +} + +func (s *HealthSequence[T]) Pop() T { + s.sequenceMu.Lock() + defer s.sequenceMu.Unlock() + + stat := s.sequence[0] + + s.sequence = s.sequence[1:] + + return stat +} + +func (s *HealthSequence[T]) IsEmpty() bool { + s.sequenceMu.Lock() + defer s.sequenceMu.Unlock() + + return len(s.sequence) == 0 +} + +type GRPCServer struct { + status HealthSequence[healthpb.HealthCheckResponse_ServingStatus] + done func() +} + +func newGRPCServer(healthSequence ...healthpb.HealthCheckResponse_ServingStatus) *GRPCServer { + gRPCService := &GRPCServer{ + status: HealthSequence[healthpb.HealthCheckResponse_ServingStatus]{ + sequence: healthSequence, + }, + } + + return gRPCService +} + +func (s *GRPCServer) Check(_ context.Context, _ *healthpb.HealthCheckRequest) (*healthpb.HealthCheckResponse, error) { + stat := s.status.Pop() + if s.status.IsEmpty() { + s.done() + } + + return &healthpb.HealthCheckResponse{ + Status: stat, + }, nil +} + +func (s *GRPCServer) Watch(_ *healthpb.HealthCheckRequest, server healthpb.Health_WatchServer) error { + stat := s.status.Pop() + if s.status.IsEmpty() { + s.done() + } + + return server.Send(&healthpb.HealthCheckResponse{ + Status: stat, + }) +} + +func (s *GRPCServer) Start(t *testing.T, done func()) (*url.URL, time.Duration) { + t.Helper() + + listener, err := net.Listen("tcp4", "127.0.0.1:0") + assert.NoError(t, err) + t.Cleanup(func() { _ = listener.Close() }) + + server := grpc.NewServer() + t.Cleanup(server.Stop) + + s.done = done + + healthpb.RegisterHealthServer(server, s) + + go func() { + err := server.Serve(listener) + assert.NoError(t, err) + }() + + // Make test timeout dependent on number of expected requests, health check interval, and a safety margin. + return testhelpers.MustParseURL("http://" + listener.Addr().String()), time.Duration(len(s.status.sequence)*int(healthCheckInterval) + 500) +} + +type HTTPServer struct { + status HealthSequence[int] + done func() +} + +func newHTTPServer(healthSequence ...int) *HTTPServer { + handler := &HTTPServer{ + status: HealthSequence[int]{ + sequence: healthSequence, + }, + } + + return handler +} + +// ServeHTTP returns HTTP response codes following a status sequences. +// It calls the given 'done' function once all request health indicators have been depleted. +func (s *HTTPServer) ServeHTTP(w http.ResponseWriter, _ *http.Request) { + stat := s.status.Pop() + + w.WriteHeader(stat) + + if s.status.IsEmpty() { + s.done() + } +} + +func (s *HTTPServer) Start(t *testing.T, done func()) (*url.URL, time.Duration) { + t.Helper() + + s.done = done + + ts := httptest.NewServer(s) + t.Cleanup(ts.Close) + + // Make test timeout dependent on number of expected requests, health check interval, and a safety margin. + return testhelpers.MustParseURL(ts.URL), time.Duration(len(s.status.sequence)*int(healthCheckInterval) + 500) +} + +type testLoadBalancer struct { + // RWMutex needed due to parallel test execution: Both the system-under-test + // and the test assertions reference the counters. + *sync.RWMutex + numRemovedServers int + numUpsertedServers int + servers []*url.URL + // options is just to make sure that LBStatusUpdater forwards options on Upsert to its BalancerHandler + options []roundrobin.ServerOption +} + +func (lb *testLoadBalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + // noop +} + +func (lb *testLoadBalancer) RemoveServer(u *url.URL) error { + lb.Lock() + defer lb.Unlock() + lb.numRemovedServers++ + lb.removeServer(u) + return nil +} + +func (lb *testLoadBalancer) UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error { + lb.Lock() + defer lb.Unlock() + lb.numUpsertedServers++ + lb.servers = append(lb.servers, u) + lb.options = append(lb.options, options...) + return nil +} + +func (lb *testLoadBalancer) Servers() []*url.URL { + return lb.servers +} + +func (lb *testLoadBalancer) Options() []roundrobin.ServerOption { + return lb.options +} + +func (lb *testLoadBalancer) removeServer(u *url.URL) { + var i int + var serverURL *url.URL + found := false + for i, serverURL = range lb.servers { + if *serverURL == *u { + found = true + break + } + } + if !found { + return + } + + lb.servers = append(lb.servers[:i], lb.servers[i+1:]...) +} diff --git a/pkg/provider/kv/kv_test.go b/pkg/provider/kv/kv_test.go index 535a48bab..83cea3ad9 100644 --- a/pkg/provider/kv/kv_test.go +++ b/pkg/provider/kv/kv_test.go @@ -48,6 +48,7 @@ func Test_buildConfiguration(t *testing.T) { "traefik/http/services/Service01/loadBalancer/healthCheck/headers/name0": "foobar", "traefik/http/services/Service01/loadBalancer/healthCheck/headers/name1": "foobar", "traefik/http/services/Service01/loadBalancer/healthCheck/scheme": "foobar", + "traefik/http/services/Service01/loadBalancer/healthCheck/mode": "foobar", "traefik/http/services/Service01/loadBalancer/healthCheck/followredirects": "true", "traefik/http/services/Service01/loadBalancer/responseForwarding/flushInterval": "foobar", "traefik/http/services/Service01/loadBalancer/passHostHeader": "true", @@ -642,6 +643,7 @@ func Test_buildConfiguration(t *testing.T) { }, HealthCheck: &dynamic.ServerHealthCheck{ Scheme: "foobar", + Mode: "foobar", Path: "foobar", Port: 42, Interval: "foobar", diff --git a/pkg/server/service/service.go b/pkg/server/service/service.go index 67c2d97d1..2c46da74f 100644 --- a/pkg/server/service/service.go +++ b/pkg/server/service/service.go @@ -360,8 +360,19 @@ func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backe followRedirects = *hc.FollowRedirects } + mode := healthcheck.HTTPMode + switch hc.Mode { + case "": + mode = healthcheck.HTTPMode + case healthcheck.GRPCMode, healthcheck.HTTPMode: + mode = hc.Mode + default: + logger.Errorf("Illegal health check mode for backend '%s'", backend) + } + return &healthcheck.Options{ Scheme: hc.Scheme, + Mode: mode, Path: hc.Path, Method: hc.Method, Port: hc.Port,