First stage of access logging middleware. Initially without any output appenders.

This commit is contained in:
Richard Shepherd 2017-04-10 08:51:08 +01:00 committed by Timo Reimann
parent 4310bdf3ca
commit 73a1b172ed
9 changed files with 447 additions and 34 deletions

View file

@ -0,0 +1,18 @@
package accesslog
import "io"
type captureRequestReader struct {
source io.ReadCloser
count int64
}
func (r *captureRequestReader) Read(p []byte) (int, error) {
n, err := r.source.Read(p)
r.count += int64(n)
return n, err
}
func (r *captureRequestReader) Close() error {
return r.source.Close()
}

View file

@ -0,0 +1,69 @@
package accesslog
import (
"bufio"
"fmt"
"net"
"net/http"
)
var (
_ http.ResponseWriter = &captureResponseWriter{}
_ http.Hijacker = &captureResponseWriter{}
_ http.Flusher = &captureResponseWriter{}
_ http.CloseNotifier = &captureResponseWriter{}
)
// captureResponseWriter is a wrapper of type http.ResponseWriter
// that tracks request status and size
type captureResponseWriter struct {
rw http.ResponseWriter
status int
size int64
}
func (crw *captureResponseWriter) Header() http.Header {
return crw.rw.Header()
}
func (crw *captureResponseWriter) Write(b []byte) (int, error) {
if crw.status == 0 {
crw.status = http.StatusOK
}
size, err := crw.rw.Write(b)
crw.size += int64(size)
return size, err
}
func (crw *captureResponseWriter) WriteHeader(s int) {
crw.rw.WriteHeader(s)
crw.status = s
}
func (crw *captureResponseWriter) Flush() {
if f, ok := crw.rw.(http.Flusher); ok {
f.Flush()
}
}
func (crw *captureResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
if h, ok := crw.rw.(http.Hijacker); ok {
return h.Hijack()
}
return nil, nil, fmt.Errorf("Not a hijacker: %T", crw.rw)
}
func (crw *captureResponseWriter) CloseNotify() <-chan bool {
if c, ok := crw.rw.(http.CloseNotifier); ok {
return c.CloseNotify()
}
return nil
}
func (crw *captureResponseWriter) Status() int {
return crw.status
}
func (crw *captureResponseWriter) Size() int64 {
return crw.size
}

View file

@ -0,0 +1,124 @@
package accesslog
import (
"net/http"
)
const (
// StartUTC is the map key used for the time at which request processing started.
StartUTC = "StartUTC"
// StartLocal is the map key used for the local time at which request processing started.
StartLocal = "StartLocal"
// Duration is the map key used for the total time taken by processing the response, including the origin server's time but
// not the log writing time.
Duration = "Duration"
// FrontendName is the map key used for the name of the Traefik frontend.
FrontendName = "FrontendName"
// BackendName is the map key used for the name of the Traefik backend.
BackendName = "BackendName"
// BackendURL is the map key used for the URL of the Traefik backend.
BackendURL = "BackendURL"
// BackendAddr is the map key used for the IP:port of the Traefik backend (extracted from BackendURL)
BackendAddr = "BackendAddr"
// ClientAddr is the map key used for the remote address in its original form (usually IP:port).
ClientAddr = "ClientAddr"
// ClientHost is the map key used for the remote IP address from which the client request was received.
ClientHost = "ClientHost"
// ClientPort is the map key used for the remote TCP port from which the client request was received.
ClientPort = "ClientPort"
// ClientUsername is the map key used for the username provided in the URL, if present.
ClientUsername = "ClientUsername"
// RequestAddr is the map key used for the HTTP Host header (usually IP:port). This is treated as not a header by the Go API.
RequestAddr = "RequestAddr"
// RequestHost is the map key used for the HTTP Host server name (not including port).
RequestHost = "RequestHost"
// RequestPort is the map key used for the TCP port from the HTTP Host.
RequestPort = "RequestPort"
// RequestMethod is the map key used for the HTTP method.
RequestMethod = "RequestMethod"
// RequestPath is the map key used for the HTTP request URI, not including the scheme, host or port.
RequestPath = "RequestPath"
// RequestProtocol is the map key used for the version of HTTP requested.
RequestProtocol = "RequestProtocol"
// RequestLine is the original request line
RequestLine = "RequestLine"
// RequestContentSize is the map key used for the number of bytes in the request entity (a.k.a. body) sent by the client.
RequestContentSize = "RequestContentSize"
// OriginDuration is the map key used for the time taken by the origin server ('upstream') to return its response.
OriginDuration = "OriginDuration"
// OriginContentSize is the map key used for the content length specified by the origin server, or 0 if unspecified.
OriginContentSize = "OriginContentSize"
// OriginStatus is the map key used for the HTTP status code returned by the origin server.
// If the request was handled by this Traefik instance (e.g. with a redirect), then this value will be absent.
OriginStatus = "OriginStatus"
// OriginStatusLine is the map key used for the HTTP status code and corresponding descriptive string.
// If the request was handled by this Traefik instance (e.g. with a redirect), then this value will be absent.
// Note that the actual message string might be different to what is reported here, depending on server behaviour.
OriginStatusLine = "OriginStatusLine"
// DownstreamStatus is the map key used for the HTTP status code returned to the client.
DownstreamStatus = "DownstreamStatus"
// DownstreamStatusLine is the map key used for the HTTP status line returned to the client.
DownstreamStatusLine = "DownstreamStatusLine"
// DownstreamContentSize is the map key used for the number of bytes in the response entity returned to the client.
// This is in addition to the "Content-Length" header, which may be present in the origin response.
DownstreamContentSize = "DownstreamContentSize"
// RequestCount is the map key used for the number of requests received since the Traefik instance started.
RequestCount = "RequestCount"
// GzipRatio is the map key used for the response body compression ratio achieved.
GzipRatio = "GzipRatio"
// Overhead is the map key used for the processing time overhead caused by Traefik.
Overhead = "Overhead"
)
// These are written out in the default case when no config is provided to specify keys of interest.
var defaultCoreKeys = [...]string{
StartUTC,
Duration,
FrontendName,
BackendName,
BackendURL,
ClientHost,
ClientPort,
ClientUsername,
RequestHost,
RequestPort,
RequestMethod,
RequestPath,
RequestProtocol,
RequestContentSize,
OriginDuration,
OriginContentSize,
OriginStatus,
DownstreamStatus,
DownstreamContentSize,
RequestCount,
}
// This contains the set of all keys, i.e. all the default keys plus all non-default keys.
var allCoreKeys = make(map[string]struct{})
func init() {
for _, k := range defaultCoreKeys {
allCoreKeys[k] = struct{}{}
}
allCoreKeys[BackendAddr] = struct{}{}
allCoreKeys[ClientAddr] = struct{}{}
allCoreKeys[RequestAddr] = struct{}{}
allCoreKeys[RequestLine] = struct{}{}
allCoreKeys[OriginStatusLine] = struct{}{}
allCoreKeys[DownstreamStatusLine] = struct{}{}
allCoreKeys[GzipRatio] = struct{}{}
allCoreKeys[StartLocal] = struct{}{}
allCoreKeys[Overhead] = struct{}{}
}
// CoreLogData holds the fields computed from the request/response.
type CoreLogData map[string]interface{}
// LogData is the data captured by the middleware so that it can be logged.
type LogData struct {
Core CoreLogData
Request http.Header
OriginResponse http.Header
DownstreamResponse http.Header
}

View file

@ -0,0 +1,144 @@
package accesslog
import (
"context"
"fmt"
"net"
"net/http"
"net/url"
"sync/atomic"
"time"
)
type key string
const (
// DataTableKey is the key within the request context used to
// store the Log Data Table
DataTableKey key = "LogDataTable"
)
// LogHandler will write each request and its response to the access log.
// It gets some information from the logInfoResponseWriter set up by previous middleware.
// Note: Current implementation collects log data but does not have the facility to
// write anywhere.
type LogHandler struct {
}
// NewLogHandler creates a new LogHandler
func NewLogHandler() *LogHandler {
return &LogHandler{}
}
// GetLogDataTable gets the request context object that contains logging data. This accretes
// data as the request passes through the middleware chain.
func GetLogDataTable(req *http.Request) *LogData {
return req.Context().Value(DataTableKey).(*LogData)
}
func (l *LogHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http.HandlerFunc) {
now := time.Now().UTC()
core := make(CoreLogData)
logDataTable := &LogData{Core: core, Request: req.Header}
core[StartUTC] = now
core[StartLocal] = now.Local()
reqWithDataTable := req.WithContext(context.WithValue(req.Context(), DataTableKey, logDataTable))
var crr *captureRequestReader
if req.Body != nil {
crr = &captureRequestReader{source: req.Body, count: 0}
reqWithDataTable.Body = crr
}
core[RequestCount] = nextRequestCount()
if req.Host != "" {
core[RequestAddr] = req.Host
core[RequestHost], core[RequestPort] = silentSplitHostPort(req.Host)
}
// copy the URL without the scheme, hostname etc
urlCopy := &url.URL{
Path: req.URL.Path,
RawPath: req.URL.RawPath,
RawQuery: req.URL.RawQuery,
ForceQuery: req.URL.ForceQuery,
Fragment: req.URL.Fragment,
}
urlCopyString := urlCopy.String()
core[RequestMethod] = req.Method
core[RequestPath] = urlCopyString
core[RequestProtocol] = req.Proto
core[RequestLine] = fmt.Sprintf("%s %s %s", req.Method, urlCopyString, req.Proto)
core[ClientAddr] = req.RemoteAddr
core[ClientHost], core[ClientPort] = silentSplitHostPort(req.RemoteAddr)
core[ClientUsername] = usernameIfPresent(req.URL)
crw := &captureResponseWriter{rw: rw}
next.ServeHTTP(crw, reqWithDataTable)
logDataTable.DownstreamResponse = crw.Header()
l.logTheRoundTrip(logDataTable, crr, crw)
}
// Close closes the Logger (i.e. the file etc).
func (l *LogHandler) Close() error {
return nil
}
func silentSplitHostPort(value string) (host string, port string) {
host, port, err := net.SplitHostPort(value)
if err != nil {
return value, "-"
}
return host, port
}
func usernameIfPresent(theURL *url.URL) string {
username := "-"
if theURL.User != nil {
if name := theURL.User.Username(); name != "" {
username = name
}
}
return username
}
// Logging handler to log frontend name, backend name, and elapsed time
func (l *LogHandler) logTheRoundTrip(logDataTable *LogData, crr *captureRequestReader, crw *captureResponseWriter) {
core := logDataTable.Core
if crr != nil {
core[RequestContentSize] = crr.count
}
core[DownstreamStatus] = crw.Status()
core[DownstreamStatusLine] = fmt.Sprintf("%03d %s", crw.Status(), http.StatusText(crw.Status()))
core[DownstreamContentSize] = crw.Size()
if original, ok := core[OriginContentSize]; ok {
o64 := original.(int64)
if o64 != crw.Size() && 0 != crw.Size() {
core[GzipRatio] = float64(o64) / float64(crw.Size())
}
}
// n.b. take care to perform time arithmetic using UTC to avoid errors at DST boundaries
total := time.Now().UTC().Sub(core[StartUTC].(time.Time))
core[Duration] = total
if origin, ok := core[OriginDuration]; ok {
core[Overhead] = total - origin.(time.Duration)
} else {
core[Overhead] = total
}
}
//-------------------------------------------------------------------------------------------------
var requestCounter uint64 // Request ID
func nextRequestCount() uint64 {
return atomic.AddUint64(&requestCounter, 1)
}

View file

@ -0,0 +1,64 @@
package accesslog
import (
"fmt"
"net/http"
"strings"
"time"
"github.com/vulcand/oxy/utils"
)
// SaveBackend sends the backend name to the logger. These are always used with a corresponding
// SaveFrontend handler.
type SaveBackend struct {
next http.Handler
backendName string
}
// NewSaveBackend creates a SaveBackend handler.
func NewSaveBackend(next http.Handler, backendName string) http.Handler {
return &SaveBackend{next, backendName}
}
func (sb *SaveBackend) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
table := GetLogDataTable(r)
table.Core[BackendName] = sb.backendName
table.Core[BackendURL] = r.URL // note that this is *not* the original incoming URL
table.Core[BackendAddr] = r.URL.Host
crw := &captureResponseWriter{rw: rw}
start := time.Now().UTC()
sb.next.ServeHTTP(crw, r)
// use UTC to handle switchover of daylight saving correctly
table.Core[OriginDuration] = time.Now().UTC().Sub(start)
table.Core[OriginStatus] = crw.Status()
table.Core[OriginStatusLine] = fmt.Sprintf("%03d %s", crw.Status(), http.StatusText(crw.Status()))
// make copy of headers so we can ensure there is no subsequent mutation during response processing
table.OriginResponse = make(http.Header)
utils.CopyHeaders(table.OriginResponse, crw.Header())
table.Core[OriginContentSize] = crw.Size()
}
//-------------------------------------------------------------------------------------------------
// SaveFrontend sends the frontend name to the logger. These are sometimes used with a corresponding
// SaveBackend handler, but not always. For example, redirected requests don't reach a backend.
type SaveFrontend struct {
next http.Handler
frontendName string
}
// NewSaveFrontend creates a SaveFrontend handler.
func NewSaveFrontend(next http.Handler, frontendName string) http.Handler {
return &SaveFrontend{next, frontendName}
}
func (sb *SaveFrontend) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
table := GetLogDataTable(r)
table.Core[FrontendName] = strings.TrimPrefix(sb.frontendName, "frontend-")
sb.next.ServeHTTP(rw, r)
}

View file

@ -13,6 +13,7 @@ import (
"time"
"github.com/containous/traefik/log"
"github.com/containous/traefik/middlewares/accesslog"
"github.com/streamrail/concurrent-map"
)
@ -133,8 +134,9 @@ func (fblh frontendBackendLoggingHandler) ServeHTTP(rw http.ResponseWriter, req
referer := req.Referer()
agent := req.UserAgent()
frontend := strings.TrimPrefix(infoRw.GetFrontend(), "frontend-")
backend := infoRw.GetBackend()
logTable := accesslog.GetLogDataTable(req)
frontend := logTable.Core[accesslog.FrontendName]
backend := logTable.Core[accesslog.BackendURL]
status := infoRw.GetStatus()
size := infoRw.GetSize()

View file

@ -1,6 +1,7 @@
package middlewares
import (
"context"
"fmt"
"io/ioutil"
"net/http"
@ -10,6 +11,7 @@ import (
"runtime"
"testing"
"github.com/containous/traefik/middlewares/accesslog"
shellwords "github.com/mattn/go-shellwords"
"github.com/stretchr/testify/assert"
)
@ -64,7 +66,15 @@ func TestLogger(t *testing.T) {
},
}
logger.ServeHTTP(&logtestResponseWriter{}, r, LogWriterTestHandlerFunc)
// Temporary - until new access logger is fully implemented
// create the data table and populate frontend and backend
core := make(accesslog.CoreLogData)
logDataTable := &accesslog.LogData{Core: core, Request: r.Header}
logDataTable.Core[accesslog.FrontendName] = testFrontendName
logDataTable.Core[accesslog.BackendURL] = testBackendName
req := r.WithContext(context.WithValue(r.Context(), accesslog.DataTableKey, logDataTable))
logger.ServeHTTP(&logtestResponseWriter{}, req, LogWriterTestHandlerFunc)
if logdata, err := ioutil.ReadFile(logfilePath); err != nil {
fmt.Printf("%s\n%s\n", string(logdata), err.Error())

View file

@ -1,20 +0,0 @@
package middlewares
import (
"net/http"
)
// SaveBackend sends the backend name to the logger.
type SaveBackend struct {
next http.Handler
}
// NewSaveBackend creates a SaveBackend
func NewSaveBackend(next http.Handler) *SaveBackend {
return &SaveBackend{next}
}
func (sb *SaveBackend) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
saveBackendNameForLogger(r, (*r.URL).String())
sb.next.ServeHTTP(rw, r)
}

View file

@ -24,6 +24,7 @@ import (
"github.com/containous/traefik/healthcheck"
"github.com/containous/traefik/log"
"github.com/containous/traefik/middlewares"
"github.com/containous/traefik/middlewares/accesslog"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
@ -48,6 +49,7 @@ type Server struct {
currentConfigurations safe.Safe
globalConfiguration GlobalConfiguration
loggerMiddleware *middlewares.Logger
accessLoggerMiddleware *accesslog.LogHandler
routinesPool *safe.Pool
leadership *cluster.Leadership
}
@ -80,6 +82,7 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server {
server.currentConfigurations.Set(currentConfigurations)
server.globalConfiguration = globalConfiguration
server.loggerMiddleware = middlewares.NewLogger(globalConfiguration.AccessLogsFile)
server.accessLoggerMiddleware = accesslog.NewLogHandler()
server.routinesPool = safe.NewPool(context.Background())
if globalConfiguration.Cluster != nil {
// leadership creation if cluster mode
@ -152,6 +155,7 @@ func (server *Server) Close() {
close(server.signals)
close(server.stopChan)
server.loggerMiddleware.Close()
server.accessLoggerMiddleware.Close()
cancel()
}
@ -176,7 +180,7 @@ func (server *Server) startHTTPServers() {
server.serverEntryPoints = server.buildEntryPoints(server.globalConfiguration)
for newServerEntryPointName, newServerEntryPoint := range server.serverEntryPoints {
serverMiddlewares := []negroni.Handler{server.loggerMiddleware, metrics}
serverMiddlewares := []negroni.Handler{server.accessLoggerMiddleware, server.loggerMiddleware, metrics}
if server.globalConfiguration.Web != nil && server.globalConfiguration.Web.Metrics != nil {
if server.globalConfiguration.Web.Metrics.Prometheus != nil {
metricsMiddleware := middlewares.NewMetricsWrapper(middlewares.NewPrometheus(newServerEntryPointName, server.globalConfiguration.Web.Metrics.Prometheus))
@ -556,7 +560,6 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo
backendsHealthcheck := map[string]*healthcheck.BackendHealthCheck{}
backend2FrontendMap := map[string]string{}
for _, configuration := range configurations {
frontendNames := sortedFrontendNamesForConfig(configuration)
frontend:
@ -571,7 +574,6 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo
log.Errorf("Skipping frontend %s...", frontendName)
continue frontend
}
saveBackend := middlewares.NewSaveBackend(fwd)
if len(frontend.EntryPoints) == 0 {
log.Errorf("No entrypoint defined for frontend %s, defaultEntryPoints:%s", frontendName, globalConfiguration.DefaultEntryPoints)
log.Errorf("Skipping frontend %s...", frontendName)
@ -606,14 +608,16 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo
log.Errorf("Skipping frontend %s...", frontendName)
continue frontend
} else {
newServerRoute.route.Handler(handler)
redirectHandlers[entryPointName] = handler
saveFrontend := accesslog.NewSaveFrontend(handler, frontendName)
newServerRoute.route.Handler(saveFrontend)
redirectHandlers[entryPointName] = saveFrontend
}
} else {
if backends[frontend.Backend] == nil {
log.Debugf("Creating backend %s", frontend.Backend)
var lb http.Handler
rr, _ := roundrobin.New(saveBackend)
saveBackend := accesslog.NewSaveBackend(fwd, frontend.Backend)
saveFrontend := accesslog.NewSaveFrontend(saveBackend, frontendName)
rr, _ := roundrobin.New(saveFrontend)
if configuration.Backends[frontend.Backend] == nil {
log.Errorf("Undefined backend '%s' for frontend %s", frontend.Backend, frontendName)
log.Errorf("Skipping frontend %s...", frontendName)
@ -635,6 +639,7 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo
sticky = roundrobin.NewStickySession(cookiename)
}
var lb http.Handler
switch lbMethod {
case types.Drr:
log.Debugf("Creating load-balancer drr")
@ -651,7 +656,6 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo
log.Errorf("Skipping frontend %s...", frontendName)
continue frontend
}
backend2FrontendMap[url.String()] = frontendName
log.Debugf("Creating server %s at %s with weight %d", serverName, url.String(), server.Weight)
if err := rebalancer.UpsertServer(url, roundrobin.Weight(server.Weight)); err != nil {
log.Errorf("Error adding server %s to load balancer: %v", server.URL, err)
@ -674,7 +678,7 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo
log.Debugf("Creating load-balancer wrr")
if stickysession {
log.Debugf("Sticky session with cookie %v", cookiename)
rr, _ = roundrobin.New(saveBackend, roundrobin.EnableStickySession(sticky))
rr, _ = roundrobin.New(saveFrontend, roundrobin.EnableStickySession(sticky))
}
lb = rr
for serverName, server := range configuration.Backends[frontend.Backend].Servers {
@ -684,7 +688,6 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo
log.Errorf("Skipping frontend %s...", frontendName)
continue frontend
}
backend2FrontendMap[url.String()] = frontendName
log.Debugf("Creating server %s at %s with weight %d", serverName, url.String(), server.Weight)
if err := rr.UpsertServer(url, roundrobin.Weight(server.Weight)); err != nil {
log.Errorf("Error adding server %s to load balancer: %v", server.URL, err)
@ -784,7 +787,6 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo
}
}
healthcheck.GetHealthCheck().SetBackendsConfiguration(server.routinesPool.Ctx(), backendsHealthcheck)
middlewares.SetBackend2FrontendMap(&backend2FrontendMap)
//sort routes
for _, serverEntryPoint := range serverEntryPoints {
serverEntryPoint.httpRouter.GetHandler().SortRoutes()