From bcc5f24c0fcf392c77d1677c98207fb9b958e1fd Mon Sep 17 00:00:00 2001 From: Emile Vauge Date: Thu, 31 Mar 2016 18:57:08 +0200 Subject: [PATCH] Add GoSafe goroutine launch Signed-off-by: Emile Vauge --- acme/acme.go | 9 ++++++--- provider/consul_catalog.go | 9 +++++---- provider/docker.go | 5 +++-- provider/file.go | 5 +++-- provider/kv.go | 5 ++++- provider/kv_test.go | 5 ++++- provider/marathon.go | 5 +++-- safe/safe.go | 28 ++++++++++++++++++++++++++++ script/deploy.sh | 2 +- server.go | 17 +++++++++++------ 10 files changed, 68 insertions(+), 22 deletions(-) create mode 100644 safe/safe.go diff --git a/acme/acme.go b/acme/acme.go index 9b41778e4..6ebbe6590 100644 --- a/acme/acme.go +++ b/acme/acme.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" log "github.com/Sirupsen/logrus" + "github.com/containous/traefik/safe" "github.com/xenolf/lego/acme" "io/ioutil" fmtlog "log" @@ -242,7 +243,9 @@ func (a *ACME) CreateConfig(tlsConfig *tls.Config, CheckOnDemandDomain func(doma return err } - go a.retrieveCertificates(client, account) + safe.Go(func() { + a.retrieveCertificates(client, account) + }) tlsConfig.GetCertificate = func(clientHello *tls.ClientHelloInfo) (*tls.Certificate, error) { if challengeCert, ok := wrapperChallengeProvider.getCertificate(clientHello.ServerName); ok { @@ -261,7 +264,7 @@ func (a *ACME) CreateConfig(tlsConfig *tls.Config, CheckOnDemandDomain func(doma } ticker := time.NewTicker(24 * time.Hour) - go func() { + safe.Go(func() { for { select { case <-ticker.C: @@ -272,7 +275,7 @@ func (a *ACME) CreateConfig(tlsConfig *tls.Config, CheckOnDemandDomain func(doma } } - }() + }) return nil } diff --git a/provider/consul_catalog.go b/provider/consul_catalog.go index 01c823058..66dd9926e 100644 --- a/provider/consul_catalog.go +++ b/provider/consul_catalog.go @@ -8,6 +8,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/cenkalti/backoff" + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/hashicorp/consul/api" ) @@ -35,7 +36,7 @@ func (provider *ConsulCatalog) watchServices(stopCh <-chan struct{}) <-chan map[ catalog := provider.client.Catalog() - go func() { + safe.Go(func() { defer close(watchCh) opts := &api.QueryOptions{WaitTime: DefaultWatchWaitTime} @@ -64,7 +65,7 @@ func (provider *ConsulCatalog) watchServices(stopCh <-chan struct{}) <-chan map[ watchCh <- data } } - }() + }) return watchCh } @@ -182,7 +183,7 @@ func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMess } provider.client = client - go func() { + safe.Go(func() { notify := func(err error, time time.Duration) { log.Errorf("Consul connection error %+v, retrying in %s", err, time) } @@ -193,7 +194,7 @@ func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMess if err != nil { log.Fatalf("Cannot connect to consul server %+v", err) } - }() + }) return err } diff --git a/provider/docker.go b/provider/docker.go index e4190990b..2ad18ee4c 100644 --- a/provider/docker.go +++ b/provider/docker.go @@ -10,6 +10,7 @@ import ( "github.com/BurntSushi/ty/fun" log "github.com/Sirupsen/logrus" "github.com/cenkalti/backoff" + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/fsouza/go-dockerclient" ) @@ -33,7 +34,7 @@ type DockerTLS struct { // Provide allows the provider to provide configurations to traefik // using the given configuration channel. func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage) error { - go func() { + safe.Go(func() { operation := func() error { var dockerClient *docker.Client var err error @@ -93,7 +94,7 @@ func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage) er if err != nil { log.Fatalf("Cannot connect to docker server %+v", err) } - }() + }) return nil } diff --git a/provider/file.go b/provider/file.go index 624a0613e..714eb1901 100644 --- a/provider/file.go +++ b/provider/file.go @@ -7,6 +7,7 @@ import ( "github.com/BurntSushi/toml" log "github.com/Sirupsen/logrus" + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "gopkg.in/fsnotify.v1" ) @@ -34,7 +35,7 @@ func (provider *File) Provide(configurationChan chan<- types.ConfigMessage) erro if provider.Watch { // Process events - go func() { + safe.Go(func() { defer watcher.Close() for { select { @@ -53,7 +54,7 @@ func (provider *File) Provide(configurationChan chan<- types.ConfigMessage) erro log.Error("Watcher event error", error) } } - }() + }) err = watcher.Add(filepath.Dir(file.Name())) if err != nil { log.Error("Error adding file watcher", err) diff --git a/provider/kv.go b/provider/kv.go index 122b150cd..b061d6f76 100644 --- a/provider/kv.go +++ b/provider/kv.go @@ -12,6 +12,7 @@ import ( "github.com/BurntSushi/ty/fun" log "github.com/Sirupsen/logrus" + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/docker/libkv" "github.com/docker/libkv/store" @@ -101,7 +102,9 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage) error } provider.kvclient = kv if provider.Watch { - go provider.watchKv(configurationChan, provider.Prefix) + safe.Go(func() { + provider.watchKv(configurationChan, provider.Prefix) + }) } configuration := provider.loadConfig() configurationChan <- types.ConfigMessage{ diff --git a/provider/kv_test.go b/provider/kv_test.go index a2438c344..3f4cd4759 100644 --- a/provider/kv_test.go +++ b/provider/kv_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/containous/traefik/safe" "github.com/docker/libkv/store" "reflect" "sort" @@ -256,7 +257,9 @@ func TestKvWatchTree(t *testing.T) { } configChan := make(chan types.ConfigMessage) - go provider.watchKv(configChan, "prefix") + safe.Go(func() { + provider.watchKv(configChan, "prefix") + }) select { case c1 := <-returnedChans: diff --git a/provider/marathon.go b/provider/marathon.go index b255bbe5f..e2dfbb3fb 100644 --- a/provider/marathon.go +++ b/provider/marathon.go @@ -10,6 +10,7 @@ import ( "crypto/tls" "github.com/BurntSushi/ty/fun" log "github.com/Sirupsen/logrus" + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/gambol99/go-marathon" "net/http" @@ -63,7 +64,7 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage) if err := client.AddEventsListener(update, marathon.EVENTS_APPLICATIONS); err != nil { log.Errorf("Failed to register for events, %s", err) } else { - go func() { + safe.Go(func() { for { event := <-update log.Debug("Marathon event receveived", event) @@ -75,7 +76,7 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage) } } } - }() + }) } } diff --git a/safe/safe.go b/safe/safe.go new file mode 100644 index 000000000..6cace27a5 --- /dev/null +++ b/safe/safe.go @@ -0,0 +1,28 @@ +package safe + +import ( + "log" + "runtime/debug" +) + +// Go starts a recoverable goroutine +func Go(goroutine func()) { + GoWithRecover(goroutine, defaultRecoverGoroutine) +} + +// GoWithRecover starts a recoverable goroutine using given customRecover() function +func GoWithRecover(goroutine func(), customRecover func(err interface{})) { + go func() { + defer func() { + if err := recover(); err != nil { + customRecover(err) + } + }() + goroutine() + }() +} + +func defaultRecoverGoroutine(err interface{}) { + log.Println(err) + debug.PrintStack() +} diff --git a/script/deploy.sh b/script/deploy.sh index c906d4531..1c9b60a1d 100755 --- a/script/deploy.sh +++ b/script/deploy.sh @@ -32,7 +32,7 @@ docker login -e $DOCKER_EMAIL -u $DOCKER_USER -p $DOCKER_PASS docker tag containous/traefik emilevauge/traefik:latest docker push emilevauge/traefik:latest docker tag emilevauge/traefik:latest emilevauge/traefik:${VERSION} -docker push -q emilevauge/traefik:${VERSION} +docker push emilevauge/traefik:${VERSION} cd .. rm -Rf traefik-library-image/ diff --git a/server.go b/server.go index 95f8eb0be..820968ff3 100644 --- a/server.go +++ b/server.go @@ -27,6 +27,7 @@ import ( "github.com/containous/oxy/stream" "github.com/containous/traefik/middlewares" "github.com/containous/traefik/provider" + "github.com/containous/traefik/safe" "github.com/containous/traefik/types" "github.com/gorilla/mux" "github.com/mailgun/manners" @@ -81,8 +82,12 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server { // Start starts the server and blocks until server is shutted down. func (server *Server) Start() { server.startHTTPServers() - go server.listenProviders() - go server.listenConfigurations() + safe.Go(func() { + server.listenProviders() + }) + safe.Go(func() { + server.listenConfigurations() + }) server.configureProviders() server.startProviders() go server.listenSignals() @@ -133,13 +138,13 @@ func (server *Server) listenProviders() { server.configurationValidatedChan <- configMsg } else { log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) - go func() { + safe.Go(func() { <-time.After(server.globalConfiguration.ProvidersThrottleDuration) if time.Now().After(lastReceivedConfiguration.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { log.Debugf("Waited for %s config, OK", configMsg.ProviderName) server.configurationValidatedChan <- *lastConfigs[configMsg.ProviderName] } - }() + }) } lastReceivedConfiguration = time.Now() } @@ -214,12 +219,12 @@ func (server *Server) startProviders() { jsonConf, _ := json.Marshal(provider) log.Infof("Starting provider %v %s", reflect.TypeOf(provider), jsonConf) currentProvider := provider - go func() { + safe.Go(func() { err := currentProvider.Provide(server.configurationChan) if err != nil { log.Errorf("Error starting provider %s", err) } - }() + }) } }