From aa705dd69119c2c9821c25444b87ceb8d38dd623 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 20 Jun 2018 09:12:03 +0200 Subject: [PATCH] Create middleware to be able to handle HTTP pipelining correctly --- Gopkg.lock | 2 +- middlewares/pipelining/pipelining.go | 62 +++++++++++++++ middlewares/pipelining/pipelining_test.go | 69 +++++++++++++++++ server/server.go | 3 + .../vulcand/oxy/cbreaker/cbreaker.go | 2 +- vendor/github.com/vulcand/oxy/forward/fwd.go | 36 ++++----- .../vulcand/oxy/roundrobin/rebalancer.go | 2 +- .../github.com/vulcand/oxy/utils/netutils.go | 76 +++++++------------ 8 files changed, 179 insertions(+), 73 deletions(-) create mode 100644 middlewares/pipelining/pipelining.go create mode 100644 middlewares/pipelining/pipelining_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 504e2c82c..bd75db1db 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1217,7 +1217,7 @@ "roundrobin", "utils" ] - revision = "d5b73186eed4aa34b52748699ad19e90f61d4059" + revision = "c2414f4542f085363f490048da2fbec5e4537eb6" [[projects]] name = "github.com/vulcand/predicate" diff --git a/middlewares/pipelining/pipelining.go b/middlewares/pipelining/pipelining.go new file mode 100644 index 000000000..ce06d79c9 --- /dev/null +++ b/middlewares/pipelining/pipelining.go @@ -0,0 +1,62 @@ +package pipelining + +import ( + "bufio" + "net" + "net/http" +) + +// Pipelining returns a middleware +type Pipelining struct { + next http.Handler +} + +// NewPipelining returns a new Pipelining instance +func NewPipelining(next http.Handler) *Pipelining { + return &Pipelining{ + next: next, + } +} + +func (p *Pipelining) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + // https://github.com/golang/go/blob/3d59583836630cf13ec4bfbed977d27b1b7adbdc/src/net/http/server.go#L201-L218 + if r.Method == http.MethodPut || r.Method == http.MethodPost { + p.next.ServeHTTP(rw, r) + } else { + p.next.ServeHTTP(&writerWithoutCloseNotify{rw}, r) + } + +} + +// writerWithoutCloseNotify helps to disable closeNotify +type writerWithoutCloseNotify struct { + W http.ResponseWriter +} + +// Header returns the response headers. +func (w *writerWithoutCloseNotify) Header() http.Header { + return w.W.Header() +} + +// Write writes the data to the connection as part of an HTTP reply. +func (w *writerWithoutCloseNotify) Write(buf []byte) (int, error) { + return w.W.Write(buf) +} + +// WriteHeader sends an HTTP response header with the provided +// status code. +func (w *writerWithoutCloseNotify) WriteHeader(code int) { + w.W.WriteHeader(code) +} + +// Flush sends any buffered data to the client. +func (w *writerWithoutCloseNotify) Flush() { + if f, ok := w.W.(http.Flusher); ok { + f.Flush() + } +} + +// Hijack hijacks the connection. +func (w *writerWithoutCloseNotify) Hijack() (net.Conn, *bufio.ReadWriter, error) { + return w.W.(http.Hijacker).Hijack() +} diff --git a/middlewares/pipelining/pipelining_test.go b/middlewares/pipelining/pipelining_test.go new file mode 100644 index 000000000..b5b327a41 --- /dev/null +++ b/middlewares/pipelining/pipelining_test.go @@ -0,0 +1,69 @@ +package pipelining + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +type recorderWithCloseNotify struct { + *httptest.ResponseRecorder +} + +func (r *recorderWithCloseNotify) CloseNotify() <-chan bool { + panic("implement me") +} + +func TestNewPipelining(t *testing.T) { + testCases := []struct { + desc string + HTTPMethod string + implementCloseNotifier bool + }{ + { + desc: "should not implement CloseNotifier with GET method", + HTTPMethod: http.MethodGet, + implementCloseNotifier: false, + }, + { + desc: "should implement CloseNotifier with PUT method", + HTTPMethod: http.MethodPut, + implementCloseNotifier: true, + }, + { + desc: "should implement CloseNotifier with POST method", + HTTPMethod: http.MethodPost, + implementCloseNotifier: true, + }, + { + desc: "should not implement CloseNotifier with GET method", + HTTPMethod: http.MethodHead, + implementCloseNotifier: false, + }, + { + desc: "should not implement CloseNotifier with PROPFIND method", + HTTPMethod: "PROPFIND", + implementCloseNotifier: false, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + nextHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, ok := w.(http.CloseNotifier) + assert.Equal(t, test.implementCloseNotifier, ok) + w.WriteHeader(http.StatusOK) + }) + handler := NewPipelining(nextHandler) + + req := httptest.NewRequest(test.HTTPMethod, "http://localhost", nil) + + handler.ServeHTTP(&recorderWithCloseNotify{httptest.NewRecorder()}, req) + }) + } +} diff --git a/server/server.go b/server/server.go index f163bfef7..adc384d7c 100644 --- a/server/server.go +++ b/server/server.go @@ -32,6 +32,7 @@ import ( "github.com/containous/traefik/middlewares/accesslog" mauth "github.com/containous/traefik/middlewares/auth" "github.com/containous/traefik/middlewares/errorpages" + "github.com/containous/traefik/middlewares/pipelining" "github.com/containous/traefik/middlewares/redirect" "github.com/containous/traefik/middlewares/tracing" "github.com/containous/traefik/provider" @@ -1023,6 +1024,8 @@ func (s *Server) loadConfig(configurations types.Configurations, globalConfigura }) } + fwd = pipelining.NewPipelining(fwd) + var rr *roundrobin.RoundRobin var saveFrontend http.Handler if s.accessLoggerMiddleware != nil { diff --git a/vendor/github.com/vulcand/oxy/cbreaker/cbreaker.go b/vendor/github.com/vulcand/oxy/cbreaker/cbreaker.go index e7f92f71f..5991a8474 100644 --- a/vendor/github.com/vulcand/oxy/cbreaker/cbreaker.go +++ b/vendor/github.com/vulcand/oxy/cbreaker/cbreaker.go @@ -156,7 +156,7 @@ func (c *CircuitBreaker) activateFallback(w http.ResponseWriter, req *http.Reque func (c *CircuitBreaker) serve(w http.ResponseWriter, req *http.Request) { start := c.clock.UtcNow() - p := utils.NewSimpleProxyWriter(w) + p := utils.NewProxyWriter(w) c.next.ServeHTTP(p, req) diff --git a/vendor/github.com/vulcand/oxy/forward/fwd.go b/vendor/github.com/vulcand/oxy/forward/fwd.go index 668cd920b..abeb3c08e 100644 --- a/vendor/github.com/vulcand/oxy/forward/fwd.go +++ b/vendor/github.com/vulcand/oxy/forward/fwd.go @@ -466,16 +466,6 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ct defer logEntry.Debug("vulcand/oxy/forward/http: completed ServeHttp on request") } - var pw utils.ProxyWriter - - // Disable closeNotify when method GET for http pipelining - // Waiting for https://github.com/golang/go/issues/23921 - if inReq.Method == http.MethodGet { - pw = utils.NewProxyWriterWithoutCloseNotify(w) - } else { - pw = utils.NewSimpleProxyWriter(w) - } - start := time.Now().UTC() outReq := new(http.Request) @@ -490,18 +480,24 @@ func (f *httpForwarder) serveHTTP(w http.ResponseWriter, inReq *http.Request, ct ModifyResponse: f.modifyResponse, BufferPool: f.bufferPool, } - revproxy.ServeHTTP(pw, outReq) - if inReq.TLS != nil { - f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v tls:version: %x, tls:resume:%t, tls:csuite:%x, tls:server:%v", - inReq.URL, pw.StatusCode(), pw.GetLength(), time.Now().UTC().Sub(start), - inReq.TLS.Version, - inReq.TLS.DidResume, - inReq.TLS.CipherSuite, - inReq.TLS.ServerName) + if f.log.GetLevel() >= log.DebugLevel { + pw := utils.NewProxyWriter(w) + revproxy.ServeHTTP(pw, outReq) + + if inReq.TLS != nil { + f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v tls:version: %x, tls:resume:%t, tls:csuite:%x, tls:server:%v", + inReq.URL, pw.StatusCode(), pw.GetLength(), time.Now().UTC().Sub(start), + inReq.TLS.Version, + inReq.TLS.DidResume, + inReq.TLS.CipherSuite, + inReq.TLS.ServerName) + } else { + f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v", + inReq.URL, pw.StatusCode(), pw.GetLength(), time.Now().UTC().Sub(start)) + } } else { - f.log.Debugf("vulcand/oxy/forward/http: Round trip: %v, code: %v, Length: %v, duration: %v", - inReq.URL, pw.StatusCode(), pw.GetLength(), time.Now().UTC().Sub(start)) + revproxy.ServeHTTP(w, outReq) } } diff --git a/vendor/github.com/vulcand/oxy/roundrobin/rebalancer.go b/vendor/github.com/vulcand/oxy/roundrobin/rebalancer.go index 81f916b74..fec74d26b 100644 --- a/vendor/github.com/vulcand/oxy/roundrobin/rebalancer.go +++ b/vendor/github.com/vulcand/oxy/roundrobin/rebalancer.go @@ -148,7 +148,7 @@ func (rb *Rebalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) { defer logEntry.Debug("vulcand/oxy/roundrobin/rebalancer: completed ServeHttp on request") } - pw := utils.NewSimpleProxyWriter(w) + pw := utils.NewProxyWriter(w) start := rb.clock.UtcNow() // make shallow copy of request before changing anything to avoid side effects diff --git a/vendor/github.com/vulcand/oxy/utils/netutils.go b/vendor/github.com/vulcand/oxy/utils/netutils.go index e6e6eb6a4..95c30e7e5 100644 --- a/vendor/github.com/vulcand/oxy/utils/netutils.go +++ b/vendor/github.com/vulcand/oxy/utils/netutils.go @@ -12,89 +12,65 @@ import ( log "github.com/sirupsen/logrus" ) -type ProxyWriter interface { - http.ResponseWriter - GetLength() int64 - StatusCode() int - GetWriter() http.ResponseWriter -} - -// ProxyWriterWithoutCloseNotify helps to capture response headers and status code -// from the ServeHTTP. It can be safely passed to ServeHTTP handler, -// wrapping the real response writer. -type ProxyWriterWithoutCloseNotify struct { +type ProxyWriter struct { W http.ResponseWriter - Code int - Length int64 + code int + length int64 } -func NewProxyWriterWithoutCloseNotify(writer http.ResponseWriter) *ProxyWriterWithoutCloseNotify { - return &ProxyWriterWithoutCloseNotify{ +func NewProxyWriter(writer http.ResponseWriter) *ProxyWriter { + return &ProxyWriter{ W: writer, } } -func NewSimpleProxyWriter(writer http.ResponseWriter) *SimpleProxyWriter { - return &SimpleProxyWriter{ - ProxyWriterWithoutCloseNotify: NewProxyWriterWithoutCloseNotify(writer), - } -} - -type SimpleProxyWriter struct { - *ProxyWriterWithoutCloseNotify -} - -func (p *ProxyWriterWithoutCloseNotify) GetWriter() http.ResponseWriter { - return p.W -} - -func (p *ProxyWriterWithoutCloseNotify) StatusCode() int { - if p.Code == 0 { +func (p *ProxyWriter) StatusCode() int { + if p.code == 0 { // per contract standard lib will set this to http.StatusOK if not set // by user, here we avoid the confusion by mirroring this logic return http.StatusOK } - return p.Code + return p.code } -func (p *ProxyWriterWithoutCloseNotify) Header() http.Header { +func (p *ProxyWriter) GetLength() int64 { + return p.length +} + +func (p *ProxyWriter) Header() http.Header { return p.W.Header() } -func (p *ProxyWriterWithoutCloseNotify) Write(buf []byte) (int, error) { - p.Length = p.Length + int64(len(buf)) +func (p *ProxyWriter) Write(buf []byte) (int, error) { + p.length = p.length + int64(len(buf)) return p.W.Write(buf) } -func (p *ProxyWriterWithoutCloseNotify) WriteHeader(code int) { - p.Code = code +func (p *ProxyWriter) WriteHeader(code int) { + p.code = code p.W.WriteHeader(code) } -func (p *ProxyWriterWithoutCloseNotify) Flush() { +func (p *ProxyWriter) Flush() { if f, ok := p.W.(http.Flusher); ok { f.Flush() } } -func (p *ProxyWriterWithoutCloseNotify) GetLength() int64 { - return p.Length -} - -func (p *SimpleProxyWriter) CloseNotify() <-chan bool { - if cn, ok := p.GetWriter().(http.CloseNotifier); ok { +func (p *ProxyWriter) CloseNotify() <-chan bool { + if cn, ok := p.W.(http.CloseNotifier); ok { return cn.CloseNotify() } - log.Warningf("Upstream ResponseWriter of type %v does not implement http.CloseNotifier. Returning dummy channel.", reflect.TypeOf(p.GetWriter())) + log.Debugf("Upstream ResponseWriter of type %v does not implement http.CloseNotifier. Returning dummy channel.", reflect.TypeOf(p.W)) return make(<-chan bool) } -func (p *ProxyWriterWithoutCloseNotify) Hijack() (net.Conn, *bufio.ReadWriter, error) { +func (p *ProxyWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { if hi, ok := p.W.(http.Hijacker); ok { return hi.Hijack() } - log.Warningf("Upstream ResponseWriter of type %v does not implement http.Hijacker. Returning dummy channel.", reflect.TypeOf(p.W)) - return nil, nil, fmt.Errorf("The response writer that was wrapped in this proxy, does not implement http.Hijacker. It is of type: %v", reflect.TypeOf(p.W)) + log.Debugf("Upstream ResponseWriter of type %v does not implement http.Hijacker. Returning dummy channel.", reflect.TypeOf(p.W)) + return nil, nil, fmt.Errorf("the response writer that was wrapped in this proxy, does not implement http.Hijacker. It is of type: %v", reflect.TypeOf(p.W)) } func NewBufferWriter(w io.WriteCloser) *BufferWriter { @@ -139,8 +115,8 @@ func (b *BufferWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { if hi, ok := b.W.(http.Hijacker); ok { return hi.Hijack() } - log.Warningf("Upstream ResponseWriter of type %v does not implement http.Hijacker. Returning dummy channel.", reflect.TypeOf(b.W)) - return nil, nil, fmt.Errorf("The response writer that was wrapped in this proxy, does not implement http.Hijacker. It is of type: %v", reflect.TypeOf(b.W)) + log.Debugf("Upstream ResponseWriter of type %v does not implement http.Hijacker. Returning dummy channel.", reflect.TypeOf(b.W)) + return nil, nil, fmt.Errorf("the response writer that was wrapped in this proxy, does not implement http.Hijacker. It is of type: %v", reflect.TypeOf(b.W)) } type nopWriteCloser struct {