From 60ea9199e5b99ef9820ec237ea5a584cd3991322 Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Thu, 9 Mar 2017 16:27:31 +0100 Subject: [PATCH] Start health checks early. Do not wait a full tick cycle to execute the first health check. Additional changes: - Make request timeout configurable (for testing purposes). - Support synchronizing on health check goroutine termination through an internal wait group (for testing purposes). - Stop leaking by closing the HTTP response body. - Extend health check logging and use WARNING level for (continuously) failing health checks. --- healthcheck/healthcheck.go | 113 ++++++++++-------- healthcheck/healthcheck_test.go | 198 ++++++++++++++++++++++++++++++++ 2 files changed, 262 insertions(+), 49 deletions(-) create mode 100644 healthcheck/healthcheck_test.go diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go index 30b4af495..fb2f74eb4 100644 --- a/healthcheck/healthcheck.go +++ b/healthcheck/healthcheck.go @@ -25,10 +25,11 @@ func GetHealthCheck() *HealthCheck { // BackendHealthCheck HealthCheck configuration for a backend type BackendHealthCheck struct { - URL string - Interval time.Duration - DisabledURLs []*url.URL - lb loadBalancer + URL string + Interval time.Duration + DisabledURLs []*url.URL + requestTimeout time.Duration + lb loadBalancer } var launch = false @@ -46,12 +47,19 @@ type loadBalancer interface { } func newHealthCheck() *HealthCheck { - return &HealthCheck{make(map[string]*BackendHealthCheck), nil} + return &HealthCheck{ + Backends: make(map[string]*BackendHealthCheck), + } } // NewBackendHealthCheck Instantiate a new BackendHealthCheck func NewBackendHealthCheck(URL string, interval time.Duration, lb loadBalancer) *BackendHealthCheck { - return &BackendHealthCheck{URL, interval, nil, lb} + return &BackendHealthCheck{ + URL: URL, + Interval: interval, + requestTimeout: 5 * time.Second, + lb: lb, + } } //SetBackendsConfiguration set backends configuration @@ -62,56 +70,63 @@ func (hc *HealthCheck) SetBackendsConfiguration(parentCtx context.Context, backe } ctx, cancel := context.WithCancel(parentCtx) hc.cancel = cancel - hc.execute(ctx) -} -func (hc *HealthCheck) execute(ctx context.Context) { for backendID, backend := range hc.Backends { - currentBackend := backend currentBackendID := backendID + currentBackend := backend safe.Go(func() { - for { - ticker := time.NewTicker(currentBackend.Interval) - select { - case <-ctx.Done(): - log.Debugf("Stopping all current Healthcheck goroutines") - return - case <-ticker.C: - log.Debugf("Refreshing Healthcheck for currentBackend %s ", currentBackendID) - enabledURLs := currentBackend.lb.Servers() - var newDisabledURLs []*url.URL - for _, url := range currentBackend.DisabledURLs { - if checkHealth(url, currentBackend.URL) { - log.Debugf("HealthCheck is up [%s]: Upsert in server list", url.String()) - currentBackend.lb.UpsertServer(url, roundrobin.Weight(1)) - } else { - newDisabledURLs = append(newDisabledURLs, url) - } - } - currentBackend.DisabledURLs = newDisabledURLs - - for _, url := range enabledURLs { - if !checkHealth(url, currentBackend.URL) { - log.Debugf("HealthCheck has failed [%s]: Remove from server list", url.String()) - currentBackend.lb.RemoveServer(url) - currentBackend.DisabledURLs = append(currentBackend.DisabledURLs, url) - } - } - - } - } + hc.execute(ctx, currentBackendID, currentBackend) }) } } -func checkHealth(serverURL *url.URL, checkURL string) bool { - timeout := time.Duration(5 * time.Second) - client := http.Client{ - Timeout: timeout, +func (hc *HealthCheck) execute(ctx context.Context, backendID string, backend *BackendHealthCheck) { + log.Debugf("Initial healthcheck for currentBackend %s ", backendID) + checkBackend(backend) + ticker := time.NewTicker(backend.Interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + log.Debugf("Stopping all current Healthcheck goroutines") + return + case <-ticker.C: + log.Debugf("Refreshing healthcheck for currentBackend %s ", backendID) + checkBackend(backend) + } } - resp, err := client.Get(serverURL.String() + checkURL) - if err != nil || resp.StatusCode != 200 { - return false - } - return true +} + +func checkBackend(currentBackend *BackendHealthCheck) { + enabledURLs := currentBackend.lb.Servers() + var newDisabledURLs []*url.URL + for _, url := range currentBackend.DisabledURLs { + if checkHealth(url, currentBackend) { + log.Debugf("HealthCheck is up [%s]: Upsert in server list", url.String()) + currentBackend.lb.UpsertServer(url, roundrobin.Weight(1)) + } else { + log.Warnf("HealthCheck is still failing [%s]", url.String()) + newDisabledURLs = append(newDisabledURLs, url) + } + } + currentBackend.DisabledURLs = newDisabledURLs + + for _, url := range enabledURLs { + if !checkHealth(url, currentBackend) { + log.Warnf("HealthCheck has failed [%s]: Remove from server list", url.String()) + currentBackend.lb.RemoveServer(url) + currentBackend.DisabledURLs = append(currentBackend.DisabledURLs, url) + } + } +} + +func checkHealth(serverURL *url.URL, backend *BackendHealthCheck) bool { + client := http.Client{ + Timeout: backend.requestTimeout, + } + resp, err := client.Get(serverURL.String() + backend.URL) + if err == nil { + defer resp.Body.Close() + } + return err == nil && resp.StatusCode == 200 } diff --git a/healthcheck/healthcheck_test.go b/healthcheck/healthcheck_test.go new file mode 100644 index 000000000..c95a0f5d2 --- /dev/null +++ b/healthcheck/healthcheck_test.go @@ -0,0 +1,198 @@ +package healthcheck + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "sync" + "testing" + "time" + + "github.com/vulcand/oxy/roundrobin" +) + +const healthCheckInterval = 100 * time.Millisecond + +type testLoadBalancer struct { + // RWMutex needed due to parallel test execution: Both the system-under-test + // and the test assertions reference the counters. + *sync.RWMutex + numRemovedServers int + numUpsertedServers int + servers []*url.URL +} + +func (lb *testLoadBalancer) RemoveServer(u *url.URL) error { + lb.Lock() + defer lb.Unlock() + lb.numRemovedServers++ + lb.removeServer(u) + return nil +} + +func (lb *testLoadBalancer) UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error { + lb.Lock() + defer lb.Unlock() + lb.numUpsertedServers++ + lb.servers = append(lb.servers, u) + return nil +} + +func (lb *testLoadBalancer) Servers() []*url.URL { + return lb.servers +} + +func (lb *testLoadBalancer) removeServer(u *url.URL) { + var i int + var serverURL *url.URL + for i, serverURL = range lb.servers { + if *serverURL == *u { + break + } + } + + lb.servers = append(lb.servers[:i], lb.servers[i+1:]...) +} + +type testHandler struct { + done func() + healthSequence []bool +} + +func newTestServer(done func(), healthSequence []bool) *httptest.Server { + handler := &testHandler{ + done: done, + healthSequence: healthSequence, + } + return httptest.NewServer(handler) +} + +// ServeHTTP returns 200 or 503 HTTP response codes depending on whether the +// current request is marked as healthy or not. +// It calls the given 'done' function once all request health indicators have +// been depleted. +func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if len(th.healthSequence) == 0 { + panic("received unexpected request") + } + + healthy := th.healthSequence[0] + if healthy { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + } + + th.healthSequence = th.healthSequence[1:] + if len(th.healthSequence) == 0 { + th.done() + } +} + +func TestSetBackendsConfiguration(t *testing.T) { + tests := []struct { + desc string + startHealthy bool + healthSequence []bool + wantNumRemovedServers int + wantNumUpsertedServers int + }{ + { + desc: "healthy server staying healthy", + startHealthy: true, + healthSequence: []bool{true}, + wantNumRemovedServers: 0, + wantNumUpsertedServers: 0, + }, + { + desc: "healthy server becoming sick", + startHealthy: true, + healthSequence: []bool{false}, + wantNumRemovedServers: 1, + wantNumUpsertedServers: 0, + }, + { + desc: "sick server becoming healthy", + startHealthy: false, + healthSequence: []bool{true}, + wantNumRemovedServers: 0, + wantNumUpsertedServers: 1, + }, + { + desc: "sick server staying sick", + startHealthy: false, + healthSequence: []bool{false}, + wantNumRemovedServers: 0, + wantNumUpsertedServers: 0, + }, + { + desc: "healthy server toggling to sick and back to healthy", + startHealthy: true, + healthSequence: []bool{false, true}, + wantNumRemovedServers: 1, + wantNumUpsertedServers: 1, + }, + } + + for _, test := range tests { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + // The context is passed to the health check and canonically cancelled by + // the test server once all expected requests have been received. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts := newTestServer(cancel, test.healthSequence) + defer ts.Close() + + lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}} + backend := NewBackendHealthCheck("/path", healthCheckInterval, lb) + serverURL := MustParseURL(ts.URL) + if test.startHealthy { + lb.servers = append(lb.servers, serverURL) + } else { + backend.DisabledURLs = append(backend.DisabledURLs, serverURL) + } + + healthCheck := HealthCheck{ + Backends: make(map[string]*BackendHealthCheck), + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + healthCheck.execute(ctx, "id", backend) + wg.Done() + }() + + // Make test timeout dependent on number of expected requests, health + // check interval, and a safety margin. + timeout := time.Duration(len(test.healthSequence)*int(healthCheckInterval) + 500) + select { + case <-time.After(timeout): + t.Fatal("test did not complete in time") + case <-ctx.Done(): + wg.Wait() + } + + lb.Lock() + defer lb.Unlock() + if lb.numRemovedServers != test.wantNumRemovedServers { + t.Errorf("got %d removed servers, wanted %d", lb.numRemovedServers, test.wantNumRemovedServers) + } + + if lb.numUpsertedServers != test.wantNumUpsertedServers { + t.Errorf("got %d upserted servers, wanted %d", lb.numUpsertedServers, test.wantNumUpsertedServers) + } + }) + } +} + +func MustParseURL(rawurl string) *url.URL { + u, err := url.Parse(rawurl) + if err != nil { + panic(fmt.Sprintf("failed to parse URL '%s': %s", rawurl, err)) + } + return u +}