traefik/pkg/middlewares/ratelimiter/rate_limiter.go

147 lines
4.3 KiB
Go
Raw Normal View History

// Package ratelimiter implements a rate limiting and traffic shaping middleware with a set of token buckets.
2018-11-14 09:18:03 +00:00
package ratelimiter
import (
"context"
"fmt"
2018-11-14 09:18:03 +00:00
"net/http"
"sync"
2018-11-14 09:18:03 +00:00
"time"
2019-08-03 01:58:23 +00:00
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/log"
2019-08-03 01:58:23 +00:00
"github.com/containous/traefik/v2/pkg/middlewares"
"github.com/containous/traefik/v2/pkg/tracing"
"github.com/mailgun/ttlmap"
2018-11-14 09:18:03 +00:00
"github.com/opentracing/opentracing-go/ext"
"github.com/vulcand/oxy/utils"
"golang.org/x/time/rate"
2018-11-14 09:18:03 +00:00
)
const (
typeName = "RateLimiterType"
maxSources = 65536
2018-11-14 09:18:03 +00:00
)
// rateLimiter implements rate limiting and traffic shaping with a set of token buckets;
// one for each traffic source. The same parameters are applied to all the buckets.
2018-11-14 09:18:03 +00:00
type rateLimiter struct {
name string
rate rate.Limit // reqs/s
burst int64
// maxDelay is the maximum duration we're willing to wait for a bucket reservation to become effective, in nanoseconds.
// For now it is somewhat arbitrarily set to 1/rate.
maxDelay time.Duration
sourceMatcher utils.SourceExtractor
next http.Handler
bucketsMu sync.Mutex
buckets *ttlmap.TtlMap // actual buckets, keyed by source.
2018-11-14 09:18:03 +00:00
}
// New returns a rate limiter middleware.
func New(ctx context.Context, next http.Handler, config dynamic.RateLimit, name string) (http.Handler, error) {
ctxLog := log.With(ctx, log.Str(log.MiddlewareName, name), log.Str(log.MiddlewareType, typeName))
log.FromContext(ctxLog).Debug("Creating middleware")
2018-11-14 09:18:03 +00:00
if config.SourceCriterion == nil ||
config.SourceCriterion.IPStrategy == nil &&
config.SourceCriterion.RequestHeaderName == "" && !config.SourceCriterion.RequestHost {
config.SourceCriterion = &dynamic.SourceCriterion{
IPStrategy: &dynamic.IPStrategy{},
}
2018-11-14 09:18:03 +00:00
}
sourceMatcher, err := middlewares.GetSourceExtractor(ctxLog, config.SourceCriterion)
if err != nil {
return nil, err
2018-11-14 09:18:03 +00:00
}
buckets, err := ttlmap.NewMap(maxSources)
2018-11-14 09:18:03 +00:00
if err != nil {
return nil, err
}
burst := config.Burst
if burst <= 0 {
burst = 1
}
// Logically, we should set maxDelay to ~infinity when config.Average == 0 (because it means to rate limiting),
// but since the reservation will give us a delay = 0 anyway in this case, we're good even with any maxDelay >= 0.
var maxDelay time.Duration
if config.Average != 0 {
maxDelay = time.Second / time.Duration(config.Average*2)
}
return &rateLimiter{
name: name,
rate: rate.Limit(config.Average),
burst: burst,
maxDelay: maxDelay,
next: next,
sourceMatcher: sourceMatcher,
buckets: buckets,
}, nil
2018-11-14 09:18:03 +00:00
}
func (rl *rateLimiter) GetTracingInformation() (string, ext.SpanKindEnum) {
return rl.name, tracing.SpanKindNoneEnum
}
func (rl *rateLimiter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := middlewares.GetLogger(r.Context(), rl.name, typeName)
source, amount, err := rl.sourceMatcher.Extract(r)
if err != nil {
logger.Errorf("could not extract source of request: %v", err)
http.Error(w, "could not extract source of request", http.StatusInternalServerError)
return
}
if amount != 1 {
logger.Infof("ignoring token bucket amount > 1: %d", amount)
}
rl.bucketsMu.Lock()
defer rl.bucketsMu.Unlock()
var bucket *rate.Limiter
if rlSource, exists := rl.buckets.Get(source); exists {
bucket = rlSource.(*rate.Limiter)
} else {
bucket = rate.NewLimiter(rl.rate, int(rl.burst))
if err := rl.buckets.Set(source, bucket, int(rl.maxDelay)*10+1); err != nil {
logger.Errorf("could not insert bucket: %v", err)
http.Error(w, "could not insert bucket", http.StatusInternalServerError)
return
}
}
res := bucket.Reserve()
if !res.OK() {
http.Error(w, "No bursty traffic allowed", http.StatusTooManyRequests)
return
}
delay := res.Delay()
if delay > rl.maxDelay {
res.Cancel()
rl.serveDelayError(w, r, delay)
return
}
time.Sleep(delay)
rl.next.ServeHTTP(w, r)
2018-11-14 09:18:03 +00:00
}
func (rl *rateLimiter) serveDelayError(w http.ResponseWriter, r *http.Request, delay time.Duration) {
w.Header().Set("Retry-After", fmt.Sprintf("%.0f", delay.Seconds()))
w.Header().Set("X-Retry-In", delay.String())
w.WriteHeader(http.StatusTooManyRequests)
if _, err := w.Write([]byte(http.StatusText(http.StatusTooManyRequests))); err != nil {
middlewares.GetLogger(r.Context(), rl.name, typeName).Errorf("could not serve 429: %v", err)
}
2018-11-14 09:18:03 +00:00
}