Fix spamming events in listenProviders

Signed-off-by: Emile Vauge <emile@vauge.com>
This commit is contained in:
Emile Vauge 2016-06-22 18:31:14 +02:00
parent 2a209c23c4
commit 606e667b88
No known key found for this signature in database
GPG key ID: D808B4C167352E59
2 changed files with 70 additions and 47 deletions

View file

@ -10,6 +10,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"os" "os"
"reflect"
"strconv" "strconv"
"strings" "strings"
"text/template" "text/template"
@ -53,6 +54,7 @@ type Kubernetes struct {
Endpoint string `description:"Kubernetes server endpoint"` Endpoint string `description:"Kubernetes server endpoint"`
DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers"` DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers"`
Namespaces Namespaces `description:"Kubernetes namespaces"` Namespaces Namespaces `description:"Kubernetes namespaces"`
lastConfiguration safe.Safe
} }
func (provider *Kubernetes) createClient() (k8s.Client, error) { func (provider *Kubernetes) createClient() (k8s.Client, error) {
@ -124,9 +126,14 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
if err != nil { if err != nil {
return err return err
} }
configurationChan <- types.ConfigMessage{ if reflect.DeepEqual(provider.lastConfiguration.Get(), templateObjects) {
ProviderName: "kubernetes", log.Debugf("Skipping event from kubernetes %+v", event)
Configuration: provider.loadConfig(*templateObjects), } else {
provider.lastConfiguration.Set(templateObjects)
configurationChan <- types.ConfigMessage{
ProviderName: "kubernetes",
Configuration: provider.loadConfig(*templateObjects),
}
} }
} }
} }
@ -146,9 +153,14 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
if err != nil { if err != nil {
return err return err
} }
configurationChan <- types.ConfigMessage{ if reflect.DeepEqual(provider.lastConfiguration.Get(), templateObjects) {
ProviderName: "kubernetes", log.Debugf("Skipping configuration from kubernetes %+v", templateObjects)
Configuration: provider.loadConfig(*templateObjects), } else {
provider.lastConfiguration.Set(templateObjects)
configurationChan <- types.ConfigMessage{
ProviderName: "kubernetes",
Configuration: provider.loadConfig(*templateObjects),
}
} }
return nil return nil

View file

@ -137,28 +137,48 @@ func (server *Server) listenProviders(stop chan bool) {
if !ok { if !ok {
return return
} }
server.defaultConfigurationValues(configMsg.Configuration)
currentConfigurations := server.currentConfigurations.Get().(configs)
jsonConf, _ := json.Marshal(configMsg.Configuration) jsonConf, _ := json.Marshal(configMsg.Configuration)
log.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf)) log.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf))
lastConfigs.Set(configMsg.ProviderName, &configMsg) if configMsg.Configuration == nil || configMsg.Configuration.Backends == nil && configMsg.Configuration.Frontends == nil {
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time) log.Infof("Skipping empty Configuration for provider %s", configMsg.ProviderName)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { } else if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
log.Debugf("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) log.Infof("Skipping same configuration for provider %s", configMsg.ProviderName)
// last config received more than n s ago
server.configurationValidatedChan <- configMsg
} else { } else {
log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) lastConfigs.Set(configMsg.ProviderName, &configMsg)
safe.Go(func() { lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
<-time.After(server.globalConfiguration.ProvidersThrottleDuration) if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time) log.Debugf("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { // last config received more than n s ago
log.Debugf("Waited for %s config, OK", configMsg.ProviderName) server.configurationValidatedChan <- configMsg
if lastConfig, ok := lastConfigs.Get(configMsg.ProviderName); ok { } else {
server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage) log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
safe.Go(func() {
<-time.After(server.globalConfiguration.ProvidersThrottleDuration)
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Waited for %s config, OK", configMsg.ProviderName)
if lastConfig, ok := lastConfigs.Get(configMsg.ProviderName); ok {
server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage)
}
} }
} })
}) }
lastReceivedConfiguration.Set(time.Now())
} }
lastReceivedConfiguration.Set(time.Now()) }
}
}
func (server *Server) defaultConfigurationValues(configuration *types.Configuration) {
if configuration == nil || configuration.Frontends == nil {
return
}
for _, frontend := range configuration.Frontends {
// default endpoints if not defined in frontends
if len(frontend.EntryPoints) == 0 {
frontend.EntryPoints = server.globalConfiguration.DefaultEntryPoints
} }
} }
} }
@ -173,28 +193,23 @@ func (server *Server) listenConfigurations(stop chan bool) {
return return
} }
currentConfigurations := server.currentConfigurations.Get().(configs) currentConfigurations := server.currentConfigurations.Get().(configs)
if configMsg.Configuration == nil {
log.Infof("Skipping empty Configuration for provider %s", configMsg.ProviderName)
} else if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
log.Infof("Skipping same configuration for provider %s", configMsg.ProviderName)
} else {
// Copy configurations to new map so we don't change current if LoadConfig fails
newConfigurations := make(configs)
for k, v := range currentConfigurations {
newConfigurations[k] = v
}
newConfigurations[configMsg.ProviderName] = configMsg.Configuration
newServerEntryPoints, err := server.loadConfig(newConfigurations, server.globalConfiguration) // Copy configurations to new map so we don't change current if LoadConfig fails
if err == nil { newConfigurations := make(configs)
for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints { for k, v := range currentConfigurations {
server.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler()) newConfigurations[k] = v
log.Infof("Server configuration reloaded on %s", server.serverEntryPoints[newServerEntryPointName].httpServer.Addr) }
} newConfigurations[configMsg.ProviderName] = configMsg.Configuration
server.currentConfigurations.Set(newConfigurations)
} else { newServerEntryPoints, err := server.loadConfig(newConfigurations, server.globalConfiguration)
log.Error("Error loading new configuration, aborted ", err) if err == nil {
for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints {
server.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler())
log.Infof("Server configuration reloaded on %s", server.serverEntryPoints[newServerEntryPointName].httpServer.Addr)
} }
server.currentConfigurations.Set(newConfigurations)
} else {
log.Error("Error loading new configuration, aborted ", err)
} }
} }
} }
@ -376,10 +391,6 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo
log.Debugf("Creating frontend %s", frontendName) log.Debugf("Creating frontend %s", frontendName)
fwd, _ := forward.New(forward.Logger(oxyLogger), forward.PassHostHeader(frontend.PassHostHeader)) fwd, _ := forward.New(forward.Logger(oxyLogger), forward.PassHostHeader(frontend.PassHostHeader))
saveBackend := middlewares.NewSaveBackend(fwd) saveBackend := middlewares.NewSaveBackend(fwd)
// default endpoints if not defined in frontends
if len(frontend.EntryPoints) == 0 {
frontend.EntryPoints = globalConfiguration.DefaultEntryPoints
}
if len(frontend.EntryPoints) == 0 { if len(frontend.EntryPoints) == 0 {
log.Errorf("No entrypoint defined for frontend %s, defaultEntryPoints:%s", frontendName, globalConfiguration.DefaultEntryPoints) log.Errorf("No entrypoint defined for frontend %s, defaultEntryPoints:%s", frontendName, globalConfiguration.DefaultEntryPoints)
log.Errorf("Skipping frontend %s...", frontendName) log.Errorf("Skipping frontend %s...", frontendName)