refactor: add Safe everywhere is needing.

This commit is contained in:
Fernandez Ludovic 2017-07-19 14:11:45 +02:00 committed by Ludovic Fernandez
parent 3174fb8861
commit c36e0b3b06
8 changed files with 30 additions and 19 deletions

View file

@ -80,7 +80,7 @@ func (d *Datastore) watchChanges() error {
if err != nil { if err != nil {
return err return err
} }
go func() { safe.Go(func() {
ctx, cancel := context.WithCancel(d.ctx) ctx, cancel := context.WithCancel(d.ctx)
operation := func() error { operation := func() error {
for { for {
@ -113,7 +113,7 @@ func (d *Datastore) watchChanges() error {
if err != nil { if err != nil {
log.Errorf("Error in watch datastore: %v", err) log.Errorf("Error in watch datastore: %v", err)
} }
}() })
return nil return nil
} }

View file

@ -155,12 +155,12 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
pool.Go(func(stop chan bool) { pool.Go(func(stop chan bool) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go func() { safe.Go(func() {
select { select {
case <-stop: case <-stop:
cancel() cancel()
} }
}() })
operation := func() error { operation := func() error {
aws, err := p.createClient() aws, err := p.createClient()

View file

@ -112,12 +112,12 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
pool.Go(func(stop chan bool) { pool.Go(func(stop chan bool) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go func() { safe.Go(func() {
select { select {
case <-stop: case <-stop:
cancel() cancel()
} }
}() })
operation := func() error { operation := func() error {
aws, err := p.createClient() aws, err := p.createClient()

View file

@ -52,7 +52,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
} }
ticker := time.NewTicker(delay) ticker := time.NewTicker(delay)
go func() { safe.Go(func() {
for t := range ticker.C { for t := range ticker.C {
log.Debug("Refreshing Provider " + t.String()) log.Debug("Refreshing Provider " + t.String())
@ -68,7 +68,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
Configuration: configuration, Configuration: configuration,
} }
} }
}() })
return nil return nil
} }

View file

@ -148,7 +148,7 @@ func createConfigurationRoutine(t *testing.T, expectedNumFrontends *int, expecte
configurationChan := make(chan types.ConfigMessage) configurationChan := make(chan types.ConfigMessage)
signal := make(chan interface{}) signal := make(chan interface{})
go func() { safe.Go(func() {
for { for {
data := <-configurationChan data := <-configurationChan
assert.Equal(t, "file", data.ProviderName) assert.Equal(t, "file", data.ProviderName)
@ -156,7 +156,7 @@ func createConfigurationRoutine(t *testing.T, expectedNumFrontends *int, expecte
assert.Len(t, data.Configuration.Backends, *expectedNumBackends) assert.Len(t, data.Configuration.Backends, *expectedNumBackends)
signal <- nil signal <- nil
} }
}() })
return configurationChan, signal return configurationChan, signal
} }

View file

@ -6,6 +6,7 @@ import (
"io/ioutil" "io/ioutil"
"time" "time"
"github.com/containous/traefik/safe"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api" "k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/pkg/api/v1"
@ -125,7 +126,9 @@ func (c *clientImpl) WatchIngresses(labelSelector labels.Selector, watchCh chan<
&v1beta1.Ingress{}, &v1beta1.Ingress{},
resyncPeriod, resyncPeriod,
newResourceEventHandlerFuncs(watchCh)) newResourceEventHandlerFuncs(watchCh))
go c.ingController.Run(stopCh) safe.Go(func() {
c.ingController.Run(stopCh)
})
} }
// eventHandlerFunc will pass the obj on to the events channel or drop it // eventHandlerFunc will pass the obj on to the events channel or drop it
@ -180,7 +183,9 @@ func (c *clientImpl) WatchServices(watchCh chan<- interface{}, stopCh <-chan str
&v1.Service{}, &v1.Service{},
resyncPeriod, resyncPeriod,
newResourceEventHandlerFuncs(watchCh)) newResourceEventHandlerFuncs(watchCh))
go c.svcController.Run(stopCh) safe.Go(func() {
c.svcController.Run(stopCh)
})
} }
// GetEndpoints returns the named Endpoints // GetEndpoints returns the named Endpoints
@ -209,7 +214,9 @@ func (c *clientImpl) WatchEndpoints(watchCh chan<- interface{}, stopCh <-chan st
&v1.Endpoints{}, &v1.Endpoints{},
resyncPeriod, resyncPeriod,
newResourceEventHandlerFuncs(watchCh)) newResourceEventHandlerFuncs(watchCh))
go c.epController.Run(stopCh) safe.Go(func() {
c.epController.Run(stopCh)
})
} }
func (c *clientImpl) WatchSecrets(watchCh chan<- interface{}, stopCh <-chan struct{}) { func (c *clientImpl) WatchSecrets(watchCh chan<- interface{}, stopCh <-chan struct{}) {
@ -224,7 +231,9 @@ func (c *clientImpl) WatchSecrets(watchCh chan<- interface{}, stopCh <-chan stru
&v1.Secret{}, &v1.Secret{},
resyncPeriod, resyncPeriod,
newResourceEventHandlerFuncs(watchCh)) newResourceEventHandlerFuncs(watchCh))
go c.secController.Run(stopCh) safe.Go(func() {
c.secController.Run(stopCh)
})
} }
// WatchAll returns events in the cluster and updates the stores via informer // WatchAll returns events in the cluster and updates the stores via informer
@ -243,7 +252,7 @@ func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan struct{}) (<-c
c.WatchEndpoints(eventCh, stopCh) c.WatchEndpoints(eventCh, stopCh)
c.WatchSecrets(eventCh, stopCh) c.WatchSecrets(eventCh, stopCh)
go func() { safe.Go(func() {
defer close(watchCh) defer close(watchCh)
defer close(eventCh) defer close(eventCh)
@ -255,7 +264,7 @@ func (c *clientImpl) WatchAll(labelSelector string, stopCh <-chan struct{}) (<-c
c.fireEvent(event, watchCh) c.fireEvent(event, watchCh)
} }
} }
}() })
return watchCh, nil return watchCh, nil
} }

View file

@ -112,7 +112,9 @@ func (p *Provider) longPoll(client rancher.Client, updateConfiguration func(stri
// Holds the connection until there is either a change in the metadata // Holds the connection until there is either a change in the metadata
// repository or `p.RefreshSeconds` has elapsed. Long polling should be // repository or `p.RefreshSeconds` has elapsed. Long polling should be
// favoured for the most accurate configuration updates. // favoured for the most accurate configuration updates.
go client.OnChange(p.RefreshSeconds, updateConfiguration) safe.Go(func() {
client.OnChange(p.RefreshSeconds, updateConfiguration)
})
<-stop <-stop
} }

View file

@ -134,7 +134,7 @@ func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessag
systemRouter.Methods("GET").Path(provider.Path + "debug/vars").HandlerFunc(expvarHandler) systemRouter.Methods("GET").Path(provider.Path + "debug/vars").HandlerFunc(expvarHandler)
} }
go func() { safe.Go(func() {
var err error var err error
var negroni = negroni.New() var negroni = negroni.New()
if provider.Auth != nil { if provider.Auth != nil {
@ -155,7 +155,7 @@ func (provider *WebProvider) Provide(configurationChan chan<- types.ConfigMessag
if err != nil { if err != nil {
log.Fatal("Error creating server: ", err) log.Fatal("Error creating server: ", err)
} }
}() })
return nil return nil
} }