From d578ed732740525d14928adb569823703d42d05d Mon Sep 17 00:00:00 2001 From: Tom Moulard Date: Mon, 12 Sep 2022 17:10:09 +0200 Subject: [PATCH] Add traffic size metrics Co-authored-by: OmarElawady Co-authored-by: Mathieu Lonjaret Co-authored-by: Romain --- cmd/traefik/traefik.go | 83 ++++++- .../content/observability/metrics/overview.md | 144 +++++++++++ pkg/metrics/datadog.go | 50 ++-- pkg/metrics/datadog_test.go | 12 + pkg/metrics/influxdb.go | 12 + pkg/metrics/influxdb2.go | 6 + pkg/metrics/influxdb2_test.go | 12 + pkg/metrics/influxdb_test.go | 24 ++ pkg/metrics/metrics.go | 79 +++++- pkg/metrics/prometheus.go | 76 ++++-- pkg/metrics/prometheus_test.go | 86 +++++++ pkg/metrics/statsd.go | 12 + pkg/metrics/statsd_test.go | 12 + .../accesslog/capture_request_reader.go | 20 -- .../accesslog/capture_response_writer.go | 83 ------- .../accesslog/capture_response_writer_test.go | 50 ---- pkg/middlewares/accesslog/field_middleware.go | 22 +- pkg/middlewares/accesslog/logger.go | 43 ++-- pkg/middlewares/accesslog/logger_test.go | 66 +++-- pkg/middlewares/capture/capture.go | 198 +++++++++++++++ pkg/middlewares/capture/capture_test.go | 234 ++++++++++++++++++ pkg/middlewares/metrics/metrics.go | 25 +- pkg/server/middleware/chainbuilder.go | 79 +----- pkg/server/router/router_test.go | 29 ++- pkg/server/routerfactory_test.go | 6 +- pkg/server/server_signals_windows.go | 1 + 26 files changed, 1125 insertions(+), 339 deletions(-) delete mode 100644 pkg/middlewares/accesslog/capture_request_reader.go delete mode 100644 pkg/middlewares/accesslog/capture_response_writer.go delete mode 100644 pkg/middlewares/accesslog/capture_response_writer_test.go create mode 100644 pkg/middlewares/capture/capture.go create mode 100644 pkg/middlewares/capture/capture_test.go diff --git a/cmd/traefik/traefik.go b/cmd/traefik/traefik.go index 0a77dd7d2..646d8e66d 100644 --- a/cmd/traefik/traefik.go +++ b/cmd/traefik/traefik.go @@ -31,6 +31,7 @@ import ( "github.com/traefik/traefik/v2/pkg/log" "github.com/traefik/traefik/v2/pkg/metrics" "github.com/traefik/traefik/v2/pkg/middlewares/accesslog" + "github.com/traefik/traefik/v2/pkg/middlewares/capture" "github.com/traefik/traefik/v2/pkg/pilot" "github.com/traefik/traefik/v2/pkg/provider/acme" "github.com/traefik/traefik/v2/pkg/provider/aggregator" @@ -41,6 +42,8 @@ import ( "github.com/traefik/traefik/v2/pkg/server/middleware" "github.com/traefik/traefik/v2/pkg/server/service" traefiktls "github.com/traefik/traefik/v2/pkg/tls" + "github.com/traefik/traefik/v2/pkg/tracing" + "github.com/traefik/traefik/v2/pkg/tracing/jaeger" "github.com/traefik/traefik/v2/pkg/types" "github.com/traefik/traefik/v2/pkg/version" "github.com/vulcand/oxy/roundrobin" @@ -270,7 +273,10 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err // Router factory accessLog := setupAccessLog(staticConfiguration.AccessLog) - chainBuilder := middleware.NewChainBuilder(*staticConfiguration, metricsRegistry, accessLog) + tracer := setupTracing(staticConfiguration.Tracing) + captureMiddleware := setupCapture(staticConfiguration) + + chainBuilder := middleware.NewChainBuilder(metricsRegistry, accessLog, tracer, captureMiddleware) routerFactory := server.NewRouterFactory(*staticConfiguration, managerFactory, tlsManager, chainBuilder, pluginBuilder, metricsRegistry) // Watcher @@ -504,13 +510,86 @@ func setupAccessLog(conf *types.AccessLog) *accesslog.Handler { accessLoggerMiddleware, err := accesslog.NewHandler(conf) if err != nil { - log.WithoutContext().Warnf("Unable to create access logger : %v", err) + log.WithoutContext().Warnf("Unable to create access logger: %v", err) return nil } return accessLoggerMiddleware } +func setupTracing(conf *static.Tracing) *tracing.Tracing { + if conf == nil { + return nil + } + + var backend tracing.Backend + + if conf.Jaeger != nil { + backend = conf.Jaeger + } + + if conf.Zipkin != nil { + if backend != nil { + log.WithoutContext().Error("Multiple tracing backend are not supported: cannot create Zipkin backend.") + } else { + backend = conf.Zipkin + } + } + + if conf.Datadog != nil { + if backend != nil { + log.WithoutContext().Error("Multiple tracing backend are not supported: cannot create Datadog backend.") + } else { + backend = conf.Datadog + } + } + + if conf.Instana != nil { + if backend != nil { + log.WithoutContext().Error("Multiple tracing backend are not supported: cannot create Instana backend.") + } else { + backend = conf.Instana + } + } + + if conf.Haystack != nil { + if backend != nil { + log.WithoutContext().Error("Multiple tracing backend are not supported: cannot create Haystack backend.") + } else { + backend = conf.Haystack + } + } + + if conf.Elastic != nil { + if backend != nil { + log.WithoutContext().Error("Multiple tracing backend are not supported: cannot create Elastic backend.") + } else { + backend = conf.Elastic + } + } + + if backend == nil { + log.WithoutContext().Debug("Could not initialize tracing, using Jaeger by default") + defaultBackend := &jaeger.Config{} + defaultBackend.SetDefaults() + backend = defaultBackend + } + + tracer, err := tracing.NewTracing(conf.ServiceName, conf.SpanNameLimit, backend) + if err != nil { + log.WithoutContext().Warnf("Unable to create tracer: %v", err) + return nil + } + return tracer +} + +func setupCapture(staticConfiguration *static.Configuration) *capture.Handler { + if staticConfiguration.AccessLog == nil && staticConfiguration.Metrics == nil { + return nil + } + return &capture.Handler{} +} + func configureLogging(staticConfiguration *static.Configuration) { // configure default log flags stdlog.SetFlags(stdlog.Lshortfile | stdlog.LstdFlags) diff --git a/docs/content/observability/metrics/overview.md b/docs/content/observability/metrics/overview.md index c0615560e..9a69c40b2 100644 --- a/docs/content/observability/metrics/overview.md +++ b/docs/content/observability/metrics/overview.md @@ -94,6 +94,8 @@ traefik_tls_certs_not_after | [HTTPS Requests Count](#https-requests-count) | ✓ | ✓ | ✓ | ✓ | | [Request Duration Histogram](#request-duration-histogram) | ✓ | ✓ | ✓ | ✓ | | [Open Connections Count](#open-connections-count) | ✓ | ✓ | ✓ | ✓ | +| [Requests Bytes Count](#requests-bytes-count) | ✓ | ✓ | ✓ | ✓ | +| [Responses Bytes Count](#responses-bytes-count) | ✓ | ✓ | ✓ | ✓ | ### HTTP Requests Count @@ -187,6 +189,52 @@ traefik_entrypoint_open_connections {prefix}.entrypoint.connections.open ``` +### Requests Bytes Count + +The total size of HTTP requests in bytes handled by an entrypoint. + +[Labels](#labels): `code`, `method`, `protocol`, `entrypoint`. + +```dd tab="Datadog" +entrypoint.requests.bytes.total +``` + +```influxdb tab="InfluxDB / InfluxDB2" +traefik.entrypoint.requests.bytes.total +``` + +```prom tab="Prometheus" +traefik_entrypoint_requests_bytes_total +``` + +```statsd tab="StatsD" +# Default prefix: "traefik" +{prefix}.entrypoint.requests.bytes.total +``` + +### Responses Bytes Count + +The total size of HTTP responses in bytes handled by an entrypoint. + +[Labels](#labels): `code`, `method`, `protocol`, `entrypoint`. + +```dd tab="Datadog" +entrypoint.responses.bytes.total +``` + +```influxdb tab="InfluxDB / InfluxDB2" +traefik.entrypoint.responses.bytes.total +``` + +```prom tab="Prometheus" +traefik_entrypoint_responses_bytes_total +``` + +```statsd tab="StatsD" +# Default prefix: "traefik" +{prefix}.entrypoint.responses.bytes.total +``` + ## Router Metrics | Metric | DataDog | InfluxDB / InfluxDB2 | Prometheus | StatsD | @@ -195,6 +243,8 @@ traefik_entrypoint_open_connections | [HTTPS Requests Count](#https-requests-count_1) | ✓ | ✓ | ✓ | ✓ | | [Request Duration Histogram](#request-duration-histogram_1) | ✓ | ✓ | ✓ | ✓ | | [Open Connections Count](#open-connections-count_1) | ✓ | ✓ | ✓ | ✓ | +| [Requests Bytes Count](#requests-bytes-count_1) | ✓ | ✓ | ✓ | ✓ | +| [Responses Bytes Count](#responses-bytes-count_1) | ✓ | ✓ | ✓ | ✓ | ### HTTP Requests Count @@ -288,6 +338,52 @@ traefik_router_open_connections {prefix}.router.connections.open ``` +### Requests Bytes Count + +The total size of HTTP requests in bytes handled by a router. + +[Labels](#labels): `code`, `method`, `protocol`, `router`, `service`. + +```dd tab="Datadog" +router.requests.bytes.total +``` + +```influxdb tab="InfluxDB / InfluxDB2" +traefik.router.requests.bytes.total +``` + +```prom tab="Prometheus" +traefik_router_requests_bytes_total +``` + +```statsd tab="StatsD" +# Default prefix: "traefik" +{prefix}.router.requests.bytes.total +``` + +### Responses Bytes Count + +The total size of HTTP responses in bytes handled by a router. + +[Labels](#labels): `code`, `method`, `protocol`, `router`, `service`. + +```dd tab="Datadog" +router.responses.bytes.total +``` + +```influxdb tab="InfluxDB / InfluxDB2" +traefik.router.responses.bytes.total +``` + +```prom tab="Prometheus" +traefik_router_responses_bytes_total +``` + +```statsd tab="StatsD" +# Default prefix: "traefik" +{prefix}.router.responses.bytes.total +``` + ## Service Metrics | Metric | DataDog | InfluxDB / InfluxDB2 | Prometheus | StatsD | @@ -298,6 +394,8 @@ traefik_router_open_connections | [Open Connections Count](#open-connections-count_2) | ✓ | ✓ | ✓ | ✓ | | [Requests Retries Count](#requests-retries-count) | ✓ | ✓ | ✓ | ✓ | | [Service Server UP](#service-server-up) | ✓ | ✓ | ✓ | ✓ | +| [Requests Bytes Count](#requests-bytes-count_2) | ✓ | ✓ | ✓ | ✓ | +| [Responses outgoing traffic](#responses-bytes-count_2) | ✓ | ✓ | ✓ | ✓ | ### HTTP Requests Count @@ -437,6 +535,52 @@ traefik_service_server_up {prefix}.service.server.up ``` +### Requests Bytes Count + +The total size of requests in bytes received by a service. + +[Labels](#labels): `code`, `method`, `protocol`, `service`. + +```dd tab="Datadog" +service.requests.bytes.total +``` + +```influxdb tab="InfluxDB / InfluxDB2" +traefik.service.requests.bytes.total +``` + +```prom tab="Prometheus" +traefik_service_requests_bytes_total +``` + +```statsd tab="StatsD" +# Default prefix: "traefik" +{prefix}.service.requests.bytes.total +``` + +### Responses Bytes Count + +The total size of responses in bytes returned by a service. + +[Labels](#labels): `code`, `method`, `protocol`, `service`. + +```dd tab="Datadog" +service.responses.bytes.total +``` + +```influxdb tab="InfluxDB / InfluxDB2" +traefik.service.responses.bytes.total +``` + +```prom tab="Prometheus" +traefik_service_responses_bytes_total +``` + +```statsd tab="StatsD" +# Default prefix: "traefik" +{prefix}.service.responses.bytes.total +``` + ## Labels Here is a comprehensive list of labels that are provided by the metrics: diff --git a/pkg/metrics/datadog.go b/pkg/metrics/datadog.go index 6aeafddf6..596bc700d 100644 --- a/pkg/metrics/datadog.go +++ b/pkg/metrics/datadog.go @@ -28,18 +28,24 @@ const ( ddEntryPointReqsTLSName = "entrypoint.request.tls.total" ddEntryPointReqDurationName = "entrypoint.request.duration" ddEntryPointOpenConnsName = "entrypoint.connections.open" + ddEntryPointReqsBytesName = "entrypoint.requests.bytes.total" + ddEntryPointRespsBytesName = "entrypoint.responses.bytes.total" - ddMetricsRouterReqsName = "router.request.total" - ddMetricsRouterReqsTLSName = "router.request.tls.total" - ddMetricsRouterReqsDurationName = "router.request.duration" - ddRouterOpenConnsName = "router.connections.open" + ddRouterReqsName = "router.request.total" + ddRouterReqsTLSName = "router.request.tls.total" + ddRouterReqsDurationName = "router.request.duration" + ddRouterOpenConnsName = "router.connections.open" + ddRouterReqsBytesName = "router.requests.bytes.total" + ddRouterRespsBytesName = "router.responses.bytes.total" - ddMetricsServiceReqsName = "service.request.total" - ddMetricsServiceReqsTLSName = "service.request.tls.total" - ddMetricsServiceReqsDurationName = "service.request.duration" - ddRetriesTotalName = "service.retries.total" - ddOpenConnsName = "service.connections.open" - ddServerUpName = "service.server.up" + ddServiceReqsName = "service.request.total" + ddServiceReqsTLSName = "service.request.tls.total" + ddServiceReqsDurationName = "service.request.duration" + ddServiceRetriesName = "service.retries.total" + ddServiceOpenConnsName = "service.connections.open" + ddServiceServerUpName = "service.server.up" + ddServiceReqsBytesName = "service.requests.bytes.total" + ddServiceRespsBytesName = "service.responses.bytes.total" ) // RegisterDatadog registers the metrics pusher if this didn't happen yet and creates a datadog Registry instance. @@ -73,24 +79,30 @@ func RegisterDatadog(ctx context.Context, config *types.Datadog) Registry { registry.entryPointReqsTLSCounter = datadogClient.NewCounter(ddEntryPointReqsTLSName, 1.0) registry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(datadogClient.NewHistogram(ddEntryPointReqDurationName, 1.0), time.Second) registry.entryPointOpenConnsGauge = datadogClient.NewGauge(ddEntryPointOpenConnsName) + registry.entryPointReqsBytesCounter = datadogClient.NewCounter(ddEntryPointReqsBytesName, 1.0) + registry.entryPointRespsBytesCounter = datadogClient.NewCounter(ddEntryPointRespsBytesName, 1.0) } if config.AddRoutersLabels { registry.routerEnabled = config.AddRoutersLabels - registry.routerReqsCounter = datadogClient.NewCounter(ddMetricsRouterReqsName, 1.0) - registry.routerReqsTLSCounter = datadogClient.NewCounter(ddMetricsRouterReqsTLSName, 1.0) - registry.routerReqDurationHistogram, _ = NewHistogramWithScale(datadogClient.NewHistogram(ddMetricsRouterReqsDurationName, 1.0), time.Second) + registry.routerReqsCounter = datadogClient.NewCounter(ddRouterReqsName, 1.0) + registry.routerReqsTLSCounter = datadogClient.NewCounter(ddRouterReqsTLSName, 1.0) + registry.routerReqDurationHistogram, _ = NewHistogramWithScale(datadogClient.NewHistogram(ddRouterReqsDurationName, 1.0), time.Second) registry.routerOpenConnsGauge = datadogClient.NewGauge(ddRouterOpenConnsName) + registry.routerReqsBytesCounter = datadogClient.NewCounter(ddRouterReqsBytesName, 1.0) + registry.routerRespsBytesCounter = datadogClient.NewCounter(ddRouterRespsBytesName, 1.0) } if config.AddServicesLabels { registry.svcEnabled = config.AddServicesLabels - registry.serviceReqsCounter = datadogClient.NewCounter(ddMetricsServiceReqsName, 1.0) - registry.serviceReqsTLSCounter = datadogClient.NewCounter(ddMetricsServiceReqsTLSName, 1.0) - registry.serviceReqDurationHistogram, _ = NewHistogramWithScale(datadogClient.NewHistogram(ddMetricsServiceReqsDurationName, 1.0), time.Second) - registry.serviceRetriesCounter = datadogClient.NewCounter(ddRetriesTotalName, 1.0) - registry.serviceOpenConnsGauge = datadogClient.NewGauge(ddOpenConnsName) - registry.serviceServerUpGauge = datadogClient.NewGauge(ddServerUpName) + registry.serviceReqsCounter = datadogClient.NewCounter(ddServiceReqsName, 1.0) + registry.serviceReqsTLSCounter = datadogClient.NewCounter(ddServiceReqsTLSName, 1.0) + registry.serviceReqDurationHistogram, _ = NewHistogramWithScale(datadogClient.NewHistogram(ddServiceReqsDurationName, 1.0), time.Second) + registry.serviceRetriesCounter = datadogClient.NewCounter(ddServiceRetriesName, 1.0) + registry.serviceOpenConnsGauge = datadogClient.NewGauge(ddServiceOpenConnsName) + registry.serviceServerUpGauge = datadogClient.NewGauge(ddServiceServerUpName) + registry.serviceReqsBytesCounter = datadogClient.NewCounter(ddServiceReqsBytesName, 1.0) + registry.serviceRespsBytesCounter = datadogClient.NewCounter(ddServiceRespsBytesName, 1.0) } return registry diff --git a/pkg/metrics/datadog_test.go b/pkg/metrics/datadog_test.go index a9fac44c8..7a36c1d77 100644 --- a/pkg/metrics/datadog_test.go +++ b/pkg/metrics/datadog_test.go @@ -55,12 +55,16 @@ func testDatadogRegistry(t *testing.T, metricsPrefix string, datadogRegistry Reg metricsPrefix + ".entrypoint.request.tls.total:1.000000|c|#entrypoint:test,tls_version:foo,tls_cipher:bar\n", metricsPrefix + ".entrypoint.request.duration:10000.000000|h|#entrypoint:test\n", metricsPrefix + ".entrypoint.connections.open:1.000000|g|#entrypoint:test\n", + metricsPrefix + ".entrypoint.requests.bytes.total:1.000000|c|#entrypoint:test\n", + metricsPrefix + ".entrypoint.responses.bytes.total:1.000000|c|#entrypoint:test\n", metricsPrefix + ".router.request.total:1.000000|c|#router:demo,service:test,code:404,method:GET\n", metricsPrefix + ".router.request.total:1.000000|c|#router:demo,service:test,code:200,method:GET\n", metricsPrefix + ".router.request.tls.total:1.000000|c|#router:demo,service:test,tls_version:foo,tls_cipher:bar\n", metricsPrefix + ".router.request.duration:10000.000000|h|#router:demo,service:test,code:200\n", metricsPrefix + ".router.connections.open:1.000000|g|#router:demo,service:test\n", + metricsPrefix + ".router.requests.bytes.total:1.000000|c|#router:demo,service:test,code:200,method:GET\n", + metricsPrefix + ".router.responses.bytes.total:1.000000|c|#router:demo,service:test,code:200,method:GET\n", metricsPrefix + ".service.request.total:1.000000|c|#service:test,code:404,method:GET\n", metricsPrefix + ".service.request.total:1.000000|c|#service:test,code:200,method:GET\n", @@ -70,6 +74,8 @@ func testDatadogRegistry(t *testing.T, metricsPrefix string, datadogRegistry Reg metricsPrefix + ".service.retries.total:2.000000|c|#service:test\n", metricsPrefix + ".service.request.duration:10000.000000|h|#service:test,code:200\n", metricsPrefix + ".service.server.up:1.000000|g|#service:test,url:http://127.0.0.1,one:two\n", + metricsPrefix + ".service.requests.bytes.total:1.000000|c|#service:test,code:200,method:GET\n", + metricsPrefix + ".service.responses.bytes.total:1.000000|c|#service:test,code:200,method:GET\n", } udp.ShouldReceiveAll(t, expected, func() { @@ -84,12 +90,16 @@ func testDatadogRegistry(t *testing.T, metricsPrefix string, datadogRegistry Reg datadogRegistry.EntryPointReqsTLSCounter().With("entrypoint", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1) datadogRegistry.EntryPointReqDurationHistogram().With("entrypoint", "test").Observe(10000) datadogRegistry.EntryPointOpenConnsGauge().With("entrypoint", "test").Set(1) + datadogRegistry.EntryPointReqsBytesCounter().With("entrypoint", "test").Add(1) + datadogRegistry.EntryPointRespsBytesCounter().With("entrypoint", "test").Add(1) datadogRegistry.RouterReqsCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) datadogRegistry.RouterReqsCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1) datadogRegistry.RouterReqsTLSCounter().With("router", "demo", "service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1) datadogRegistry.RouterReqDurationHistogram().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000) datadogRegistry.RouterOpenConnsGauge().With("router", "demo", "service", "test").Set(1) + datadogRegistry.RouterReqsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + datadogRegistry.RouterRespsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) datadogRegistry.ServiceReqsCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) datadogRegistry.ServiceReqsCounter().With("service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1) @@ -99,5 +109,7 @@ func testDatadogRegistry(t *testing.T, metricsPrefix string, datadogRegistry Reg datadogRegistry.ServiceRetriesCounter().With("service", "test").Add(1) datadogRegistry.ServiceRetriesCounter().With("service", "test").Add(1) datadogRegistry.ServiceServerUpGauge().With("service", "test", "url", "http://127.0.0.1", "one", "two").Set(1) + datadogRegistry.ServiceReqsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + datadogRegistry.ServiceRespsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) }) } diff --git a/pkg/metrics/influxdb.go b/pkg/metrics/influxdb.go index 47a0f2962..f9f5485b7 100644 --- a/pkg/metrics/influxdb.go +++ b/pkg/metrics/influxdb.go @@ -33,11 +33,15 @@ const ( influxDBEntryPointReqsTLSName = "traefik.entrypoint.requests.tls.total" influxDBEntryPointReqDurationName = "traefik.entrypoint.request.duration" influxDBEntryPointOpenConnsName = "traefik.entrypoint.connections.open" + influxDBEntryPointReqsBytesName = "traefik.entrypoint.requests.bytes.total" + influxDBEntryPointRespsBytesName = "traefik.entrypoint.responses.bytes.total" influxDBRouterReqsName = "traefik.router.requests.total" influxDBRouterReqsTLSName = "traefik.router.requests.tls.total" influxDBRouterReqsDurationName = "traefik.router.request.duration" influxDBORouterOpenConnsName = "traefik.router.connections.open" + influxDBRouterReqsBytesName = "traefik.router.requests.bytes.total" + influxDBRouterRespsBytesName = "traefik.router.responses.bytes.total" influxDBServiceReqsName = "traefik.service.requests.total" influxDBServiceReqsTLSName = "traefik.service.requests.tls.total" @@ -45,6 +49,8 @@ const ( influxDBServiceRetriesTotalName = "traefik.service.retries.total" influxDBServiceOpenConnsName = "traefik.service.connections.open" influxDBServiceServerUpName = "traefik.service.server.up" + influxDBServiceReqsBytesName = "traefik.service.requests.bytes.total" + influxDBServiceRespsBytesName = "traefik.service.responses.bytes.total" ) const ( @@ -75,6 +81,8 @@ func RegisterInfluxDB(ctx context.Context, config *types.InfluxDB) Registry { registry.entryPointReqsTLSCounter = influxDBClient.NewCounter(influxDBEntryPointReqsTLSName) registry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(influxDBClient.NewHistogram(influxDBEntryPointReqDurationName), time.Second) registry.entryPointOpenConnsGauge = influxDBClient.NewGauge(influxDBEntryPointOpenConnsName) + registry.entryPointReqsBytesCounter = influxDBClient.NewCounter(influxDBEntryPointReqsBytesName) + registry.entryPointRespsBytesCounter = influxDBClient.NewCounter(influxDBEntryPointRespsBytesName) } if config.AddRoutersLabels { @@ -83,6 +91,8 @@ func RegisterInfluxDB(ctx context.Context, config *types.InfluxDB) Registry { registry.routerReqsTLSCounter = influxDBClient.NewCounter(influxDBRouterReqsTLSName) registry.routerReqDurationHistogram, _ = NewHistogramWithScale(influxDBClient.NewHistogram(influxDBRouterReqsDurationName), time.Second) registry.routerOpenConnsGauge = influxDBClient.NewGauge(influxDBORouterOpenConnsName) + registry.routerReqsBytesCounter = influxDBClient.NewCounter(influxDBRouterReqsBytesName) + registry.routerRespsBytesCounter = influxDBClient.NewCounter(influxDBRouterRespsBytesName) } if config.AddServicesLabels { @@ -93,6 +103,8 @@ func RegisterInfluxDB(ctx context.Context, config *types.InfluxDB) Registry { registry.serviceRetriesCounter = influxDBClient.NewCounter(influxDBServiceRetriesTotalName) registry.serviceOpenConnsGauge = influxDBClient.NewGauge(influxDBServiceOpenConnsName) registry.serviceServerUpGauge = influxDBClient.NewGauge(influxDBServiceServerUpName) + registry.serviceReqsBytesCounter = influxDBClient.NewCounter(influxDBServiceReqsBytesName) + registry.serviceRespsBytesCounter = influxDBClient.NewCounter(influxDBServiceRespsBytesName) } return registry diff --git a/pkg/metrics/influxdb2.go b/pkg/metrics/influxdb2.go index 9ca2caa94..27be18f2a 100644 --- a/pkg/metrics/influxdb2.go +++ b/pkg/metrics/influxdb2.go @@ -65,6 +65,8 @@ func RegisterInfluxDB2(ctx context.Context, config *types.InfluxDB2) Registry { registry.entryPointReqsTLSCounter = influxDB2Store.NewCounter(influxDBEntryPointReqsTLSName) registry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBEntryPointReqDurationName), time.Second) registry.entryPointOpenConnsGauge = influxDB2Store.NewGauge(influxDBEntryPointOpenConnsName) + registry.entryPointReqsBytesCounter = influxDB2Store.NewCounter(influxDBEntryPointReqsBytesName) + registry.entryPointRespsBytesCounter = influxDB2Store.NewCounter(influxDBEntryPointRespsBytesName) } if config.AddRoutersLabels { @@ -73,6 +75,8 @@ func RegisterInfluxDB2(ctx context.Context, config *types.InfluxDB2) Registry { registry.routerReqsTLSCounter = influxDB2Store.NewCounter(influxDBRouterReqsTLSName) registry.routerReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBRouterReqsDurationName), time.Second) registry.routerOpenConnsGauge = influxDB2Store.NewGauge(influxDBORouterOpenConnsName) + registry.routerReqsBytesCounter = influxDB2Store.NewCounter(influxDBRouterReqsBytesName) + registry.routerRespsBytesCounter = influxDB2Store.NewCounter(influxDBRouterRespsBytesName) } if config.AddServicesLabels { @@ -83,6 +87,8 @@ func RegisterInfluxDB2(ctx context.Context, config *types.InfluxDB2) Registry { registry.serviceRetriesCounter = influxDB2Store.NewCounter(influxDBServiceRetriesTotalName) registry.serviceOpenConnsGauge = influxDB2Store.NewGauge(influxDBServiceOpenConnsName) registry.serviceServerUpGauge = influxDB2Store.NewGauge(influxDBServiceServerUpName) + registry.serviceReqsBytesCounter = influxDB2Store.NewCounter(influxDBServiceReqsBytesName) + registry.serviceRespsBytesCounter = influxDB2Store.NewCounter(influxDBServiceRespsBytesName) } return registry diff --git a/pkg/metrics/influxdb2_test.go b/pkg/metrics/influxdb2_test.go index 272814b79..3fc628d67 100644 --- a/pkg/metrics/influxdb2_test.go +++ b/pkg/metrics/influxdb2_test.go @@ -73,12 +73,16 @@ func TestInfluxDB2(t *testing.T) { `(traefik\.entrypoint\.requests\.tls\.total,entrypoint=test,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`, `(traefik\.entrypoint\.request\.duration(?:,code=[\d]{3})?,entrypoint=test p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`, `(traefik\.entrypoint\.connections\.open,entrypoint=test value=1) [\d]{19}`, + `(traefik\.entrypoint\.requests\.bytes\.total,code=200,entrypoint=test,method=GET count=1) [\d]{19}`, + `(traefik\.entrypoint\.responses\.bytes\.total,code=200,entrypoint=test,method=GET count=1) [\d]{19}`, } influxDB2Registry.EntryPointReqsCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) influxDB2Registry.EntryPointReqsTLSCounter().With("entrypoint", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1) influxDB2Registry.EntryPointReqDurationHistogram().With("entrypoint", "test").Observe(10000) influxDB2Registry.EntryPointOpenConnsGauge().With("entrypoint", "test").Set(1) + influxDB2Registry.EntryPointReqsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + influxDB2Registry.EntryPointRespsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) msgEntrypoint := <-c assertMessage(t, *msgEntrypoint, expectedEntrypoint) @@ -89,6 +93,8 @@ func TestInfluxDB2(t *testing.T) { `(traefik\.router\.requests\.tls\.total,router=demo,service=test,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`, `(traefik\.router\.request\.duration,code=200,router=demo,service=test p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`, `(traefik\.router\.connections\.open,router=demo,service=test value=1) [\d]{19}`, + `(traefik\.router\.requests\.bytes\.total,code=200,method=GET,router=demo,service=test count=1) [\d]{19}`, + `(traefik\.router\.responses\.bytes\.total,code=200,method=GET,router=demo,service=test count=1) [\d]{19}`, } influxDB2Registry.RouterReqsCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1) @@ -96,6 +102,8 @@ func TestInfluxDB2(t *testing.T) { influxDB2Registry.RouterReqsTLSCounter().With("router", "demo", "service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1) influxDB2Registry.RouterReqDurationHistogram().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000) influxDB2Registry.RouterOpenConnsGauge().With("router", "demo", "service", "test").Set(1) + influxDB2Registry.RouterReqsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + influxDB2Registry.RouterRespsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) msgRouter := <-c assertMessage(t, *msgRouter, expectedRouter) @@ -106,6 +114,8 @@ func TestInfluxDB2(t *testing.T) { `(traefik\.service\.requests\.tls\.total,service=test,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`, `(traefik\.service\.request\.duration,code=200,service=test p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`, `(traefik\.service\.server\.up,service=test,url=http://127.0.0.1 value=1) [\d]{19}`, + `(traefik\.service\.requests\.bytes\.total,code=200,method=GET,service=test count=1) [\d]{19}`, + `(traefik\.service\.responses\.bytes\.total,code=200,method=GET,service=test count=1) [\d]{19}`, } influxDB2Registry.ServiceReqsCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) @@ -113,6 +123,8 @@ func TestInfluxDB2(t *testing.T) { influxDB2Registry.ServiceReqsTLSCounter().With("service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1) influxDB2Registry.ServiceReqDurationHistogram().With("service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000) influxDB2Registry.ServiceServerUpGauge().With("service", "test", "url", "http://127.0.0.1").Set(1) + influxDB2Registry.ServiceReqsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + influxDB2Registry.ServiceRespsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) msgService := <-c assertMessage(t, *msgService, expectedService) diff --git a/pkg/metrics/influxdb_test.go b/pkg/metrics/influxdb_test.go index 6889ffa9b..38a146697 100644 --- a/pkg/metrics/influxdb_test.go +++ b/pkg/metrics/influxdb_test.go @@ -69,6 +69,8 @@ func TestInfluxDB(t *testing.T) { `(traefik\.entrypoint\.requests\.tls\.total,entrypoint=test,tag1=val1,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`, `(traefik\.entrypoint\.request\.duration(?:,code=[\d]{3})?,entrypoint=test,tag1=val1 p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`, `(traefik\.entrypoint\.connections\.open,entrypoint=test,tag1=val1 value=1) [\d]{19}`, + `(traefik\.entrypoint\.requests\.bytes\.total,code=200,entrypoint=test,method=GET,tag1=val1 count=1) [\d]{19}`, + `(traefik\.entrypoint\.responses\.bytes\.total,code=200,entrypoint=test,method=GET,tag1=val1 count=1) [\d]{19}`, } msgEntrypoint := udp.ReceiveString(t, func() { @@ -76,6 +78,8 @@ func TestInfluxDB(t *testing.T) { influxDBRegistry.EntryPointReqsTLSCounter().With("entrypoint", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1) influxDBRegistry.EntryPointReqDurationHistogram().With("entrypoint", "test").Observe(10000) influxDBRegistry.EntryPointOpenConnsGauge().With("entrypoint", "test").Set(1) + influxDBRegistry.EntryPointReqsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + influxDBRegistry.EntryPointRespsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) }) assertMessage(t, msgEntrypoint, expectedEntrypoint) @@ -86,6 +90,8 @@ func TestInfluxDB(t *testing.T) { `(traefik\.router\.requests\.tls\.total,router=demo,service=test,tag1=val1,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`, `(traefik\.router\.request\.duration,code=200,router=demo,service=test,tag1=val1 p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`, `(traefik\.router\.connections\.open,router=demo,service=test,tag1=val1 value=1) [\d]{19}`, + `(traefik\.router\.requests\.bytes\.total,code=200,method=GET,router=demo,service=test,tag1=val1 count=1) [\d]{19}`, + `(traefik\.router\.responses\.bytes\.total,code=200,method=GET,router=demo,service=test,tag1=val1 count=1) [\d]{19}`, } msgRouter := udp.ReceiveString(t, func() { @@ -94,6 +100,8 @@ func TestInfluxDB(t *testing.T) { influxDBRegistry.RouterReqsTLSCounter().With("router", "demo", "service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1) influxDBRegistry.RouterReqDurationHistogram().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000) influxDBRegistry.RouterOpenConnsGauge().With("router", "demo", "service", "test").Set(1) + influxDBRegistry.RouterReqsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + influxDBRegistry.RouterRespsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) }) assertMessage(t, msgRouter, expectedRouter) @@ -106,6 +114,8 @@ func TestInfluxDB(t *testing.T) { `(traefik\.service\.retries\.total(?:,code=[\d]{3},method=GET)?,service=test,tag1=val1 count=2) [\d]{19}`, `(traefik\.service\.server\.up,service=test,tag1=val1,url=http://127.0.0.1 value=1) [\d]{19}`, `(traefik\.service\.connections\.open,service=test,tag1=val1 value=1) [\d]{19}`, + `(traefik\.service\.requests\.bytes\.total,code=200,method=GET,service=test,tag1=val1 count=1) [\d]{19}`, + `(traefik\.service\.responses\.bytes\.total,code=200,method=GET,service=test,tag1=val1 count=1) [\d]{19}`, } msgService := udp.ReceiveString(t, func() { @@ -117,6 +127,8 @@ func TestInfluxDB(t *testing.T) { influxDBRegistry.ServiceRetriesCounter().With("service", "test").Add(1) influxDBRegistry.ServiceRetriesCounter().With("service", "test").Add(1) influxDBRegistry.ServiceServerUpGauge().With("service", "test", "url", "http://127.0.0.1").Set(1) + influxDBRegistry.ServiceReqsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + influxDBRegistry.ServiceRespsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) }) assertMessage(t, msgService, expectedService) @@ -181,12 +193,16 @@ func TestInfluxDBHTTP(t *testing.T) { `(traefik\.entrypoint\.requests\.tls\.total,entrypoint=test,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`, `(traefik\.entrypoint\.request\.duration(?:,code=[\d]{3})?,entrypoint=test p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`, `(traefik\.entrypoint\.connections\.open,entrypoint=test value=1) [\d]{19}`, + `(traefik\.entrypoint\.requests\.bytes\.total,code=200,entrypoint=test,method=GET count=1) [\d]{19}`, + `(traefik\.entrypoint\.responses\.bytes\.total,code=200,entrypoint=test,method=GET count=1) [\d]{19}`, } influxDBRegistry.EntryPointReqsCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) influxDBRegistry.EntryPointReqsTLSCounter().With("entrypoint", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1) influxDBRegistry.EntryPointReqDurationHistogram().With("entrypoint", "test").Observe(10000) influxDBRegistry.EntryPointOpenConnsGauge().With("entrypoint", "test").Set(1) + influxDBRegistry.EntryPointReqsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + influxDBRegistry.EntryPointRespsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) msgEntrypoint := <-c assertMessage(t, *msgEntrypoint, expectedEntrypoint) @@ -197,6 +213,8 @@ func TestInfluxDBHTTP(t *testing.T) { `(traefik\.router\.requests\.tls\.total,router=demo,service=test,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`, `(traefik\.router\.request\.duration,code=200,router=demo,service=test p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`, `(traefik\.router\.connections\.open,router=demo,service=test value=1) [\d]{19}`, + `(traefik\.router\.requests\.bytes\.total,code=200,method=GET,router=demo,service=test count=1) [\d]{19}`, + `(traefik\.router\.responses\.bytes\.total,code=200,method=GET,router=demo,service=test count=1) [\d]{19}`, } influxDBRegistry.RouterReqsCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1) @@ -204,6 +222,8 @@ func TestInfluxDBHTTP(t *testing.T) { influxDBRegistry.RouterReqsTLSCounter().With("router", "demo", "service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1) influxDBRegistry.RouterReqDurationHistogram().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000) influxDBRegistry.RouterOpenConnsGauge().With("router", "demo", "service", "test").Set(1) + influxDBRegistry.RouterReqsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + influxDBRegistry.RouterRespsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) msgRouter := <-c assertMessage(t, *msgRouter, expectedRouter) @@ -216,6 +236,8 @@ func TestInfluxDBHTTP(t *testing.T) { `(traefik\.service\.retries\.total(?:,code=[\d]{3},method=GET)?,service=test count=2) [\d]{19}`, `(traefik\.service\.server\.up,service=test,url=http://127.0.0.1 value=1) [\d]{19}`, `(traefik\.service\.connections\.open,service=test value=1) [\d]{19}`, + `(traefik\.service\.requests\.bytes\.total,code=200,method=GET,service=test count=1) [\d]{19}`, + `(traefik\.service\.responses\.bytes\.total,code=200,method=GET,service=test count=1) [\d]{19}`, } influxDBRegistry.ServiceReqsCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) @@ -226,6 +248,8 @@ func TestInfluxDBHTTP(t *testing.T) { influxDBRegistry.ServiceRetriesCounter().With("service", "test").Add(1) influxDBRegistry.ServiceRetriesCounter().With("service", "test").Add(1) influxDBRegistry.ServiceServerUpGauge().With("service", "test", "url", "http://127.0.0.1").Set(1) + influxDBRegistry.ServiceReqsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + influxDBRegistry.ServiceRespsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) msgService := <-c assertMessage(t, *msgService, expectedService) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 2698df768..ac49d389a 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -36,6 +36,8 @@ type Registry interface { EntryPointReqsTLSCounter() metrics.Counter EntryPointReqDurationHistogram() ScalableHistogram EntryPointOpenConnsGauge() metrics.Gauge + EntryPointReqsBytesCounter() metrics.Counter + EntryPointRespsBytesCounter() metrics.Counter // router metrics @@ -43,6 +45,8 @@ type Registry interface { RouterReqsTLSCounter() metrics.Counter RouterReqDurationHistogram() ScalableHistogram RouterOpenConnsGauge() metrics.Gauge + RouterReqsBytesCounter() metrics.Counter + RouterRespsBytesCounter() metrics.Counter // service metrics @@ -52,6 +56,8 @@ type Registry interface { ServiceOpenConnsGauge() metrics.Gauge ServiceRetriesCounter() metrics.Counter ServiceServerUpGauge() metrics.Gauge + ServiceReqsBytesCounter() metrics.Counter + ServiceRespsBytesCounter() metrics.Counter } // NewVoidRegistry is a noop implementation of metrics.Registry. @@ -62,7 +68,7 @@ func NewVoidRegistry() Registry { // NewMultiRegistry is an implementation of metrics.Registry that wraps multiple registries. // It handles the case when a registry hasn't registered some metric and returns nil. -// This allows for feature imparity between the different metric implementations. +// This allows for feature disparity between the different metric implementations. func NewMultiRegistry(registries []Registry) Registry { var configReloadsCounter []metrics.Counter var configReloadsFailureCounter []metrics.Counter @@ -73,16 +79,22 @@ func NewMultiRegistry(registries []Registry) Registry { var entryPointReqsTLSCounter []metrics.Counter var entryPointReqDurationHistogram []ScalableHistogram var entryPointOpenConnsGauge []metrics.Gauge + var entryPointReqsBytesCounter []metrics.Counter + var entryPointRespsBytesCounter []metrics.Counter var routerReqsCounter []metrics.Counter var routerReqsTLSCounter []metrics.Counter var routerReqDurationHistogram []ScalableHistogram var routerOpenConnsGauge []metrics.Gauge + var routerReqsBytesCounter []metrics.Counter + var routerRespsBytesCounter []metrics.Counter var serviceReqsCounter []metrics.Counter var serviceReqsTLSCounter []metrics.Counter var serviceReqDurationHistogram []ScalableHistogram var serviceOpenConnsGauge []metrics.Gauge var serviceRetriesCounter []metrics.Counter var serviceServerUpGauge []metrics.Gauge + var serviceReqsBytesCounter []metrics.Counter + var serviceRespsBytesCounter []metrics.Counter for _, r := range registries { if r.ConfigReloadsCounter() != nil { @@ -112,6 +124,12 @@ func NewMultiRegistry(registries []Registry) Registry { if r.EntryPointOpenConnsGauge() != nil { entryPointOpenConnsGauge = append(entryPointOpenConnsGauge, r.EntryPointOpenConnsGauge()) } + if r.EntryPointReqsBytesCounter() != nil { + entryPointReqsBytesCounter = append(entryPointReqsBytesCounter, r.EntryPointReqsBytesCounter()) + } + if r.EntryPointRespsBytesCounter() != nil { + entryPointRespsBytesCounter = append(entryPointRespsBytesCounter, r.EntryPointRespsBytesCounter()) + } if r.RouterReqsCounter() != nil { routerReqsCounter = append(routerReqsCounter, r.RouterReqsCounter()) } @@ -124,6 +142,12 @@ func NewMultiRegistry(registries []Registry) Registry { if r.RouterOpenConnsGauge() != nil { routerOpenConnsGauge = append(routerOpenConnsGauge, r.RouterOpenConnsGauge()) } + if r.RouterReqsBytesCounter() != nil { + routerReqsBytesCounter = append(routerReqsBytesCounter, r.RouterReqsBytesCounter()) + } + if r.RouterRespsBytesCounter() != nil { + routerRespsBytesCounter = append(routerRespsBytesCounter, r.RouterRespsBytesCounter()) + } if r.ServiceReqsCounter() != nil { serviceReqsCounter = append(serviceReqsCounter, r.ServiceReqsCounter()) } @@ -142,6 +166,12 @@ func NewMultiRegistry(registries []Registry) Registry { if r.ServiceServerUpGauge() != nil { serviceServerUpGauge = append(serviceServerUpGauge, r.ServiceServerUpGauge()) } + if r.ServiceReqsBytesCounter() != nil { + serviceReqsBytesCounter = append(serviceReqsBytesCounter, r.ServiceReqsBytesCounter()) + } + if r.ServiceRespsBytesCounter() != nil { + serviceRespsBytesCounter = append(serviceRespsBytesCounter, r.ServiceRespsBytesCounter()) + } } return &standardRegistry{ @@ -155,18 +185,24 @@ func NewMultiRegistry(registries []Registry) Registry { tlsCertsNotAfterTimestampGauge: multi.NewGauge(tlsCertsNotAfterTimestampGauge...), entryPointReqsCounter: multi.NewCounter(entryPointReqsCounter...), entryPointReqsTLSCounter: multi.NewCounter(entryPointReqsTLSCounter...), - entryPointReqDurationHistogram: NewMultiHistogram(entryPointReqDurationHistogram...), + entryPointReqDurationHistogram: MultiHistogram(entryPointReqDurationHistogram), entryPointOpenConnsGauge: multi.NewGauge(entryPointOpenConnsGauge...), + entryPointReqsBytesCounter: multi.NewCounter(entryPointReqsBytesCounter...), + entryPointRespsBytesCounter: multi.NewCounter(entryPointRespsBytesCounter...), routerReqsCounter: multi.NewCounter(routerReqsCounter...), routerReqsTLSCounter: multi.NewCounter(routerReqsTLSCounter...), - routerReqDurationHistogram: NewMultiHistogram(routerReqDurationHistogram...), + routerReqDurationHistogram: MultiHistogram(routerReqDurationHistogram), routerOpenConnsGauge: multi.NewGauge(routerOpenConnsGauge...), + routerReqsBytesCounter: multi.NewCounter(routerReqsBytesCounter...), + routerRespsBytesCounter: multi.NewCounter(routerRespsBytesCounter...), serviceReqsCounter: multi.NewCounter(serviceReqsCounter...), serviceReqsTLSCounter: multi.NewCounter(serviceReqsTLSCounter...), - serviceReqDurationHistogram: NewMultiHistogram(serviceReqDurationHistogram...), + serviceReqDurationHistogram: MultiHistogram(serviceReqDurationHistogram), serviceOpenConnsGauge: multi.NewGauge(serviceOpenConnsGauge...), serviceRetriesCounter: multi.NewCounter(serviceRetriesCounter...), serviceServerUpGauge: multi.NewGauge(serviceServerUpGauge...), + serviceReqsBytesCounter: multi.NewCounter(serviceReqsBytesCounter...), + serviceRespsBytesCounter: multi.NewCounter(serviceRespsBytesCounter...), } } @@ -183,16 +219,22 @@ type standardRegistry struct { entryPointReqsTLSCounter metrics.Counter entryPointReqDurationHistogram ScalableHistogram entryPointOpenConnsGauge metrics.Gauge + entryPointReqsBytesCounter metrics.Counter + entryPointRespsBytesCounter metrics.Counter routerReqsCounter metrics.Counter routerReqsTLSCounter metrics.Counter routerReqDurationHistogram ScalableHistogram routerOpenConnsGauge metrics.Gauge + routerReqsBytesCounter metrics.Counter + routerRespsBytesCounter metrics.Counter serviceReqsCounter metrics.Counter serviceReqsTLSCounter metrics.Counter serviceReqDurationHistogram ScalableHistogram serviceOpenConnsGauge metrics.Gauge serviceRetriesCounter metrics.Counter serviceServerUpGauge metrics.Gauge + serviceReqsBytesCounter metrics.Counter + serviceRespsBytesCounter metrics.Counter } func (r *standardRegistry) IsEpEnabled() bool { @@ -243,6 +285,14 @@ func (r *standardRegistry) EntryPointOpenConnsGauge() metrics.Gauge { return r.entryPointOpenConnsGauge } +func (r *standardRegistry) EntryPointReqsBytesCounter() metrics.Counter { + return r.entryPointReqsBytesCounter +} + +func (r *standardRegistry) EntryPointRespsBytesCounter() metrics.Counter { + return r.entryPointRespsBytesCounter +} + func (r *standardRegistry) RouterReqsCounter() metrics.Counter { return r.routerReqsCounter } @@ -259,6 +309,14 @@ func (r *standardRegistry) RouterOpenConnsGauge() metrics.Gauge { return r.routerOpenConnsGauge } +func (r *standardRegistry) RouterReqsBytesCounter() metrics.Counter { + return r.routerReqsBytesCounter +} + +func (r *standardRegistry) RouterRespsBytesCounter() metrics.Counter { + return r.routerRespsBytesCounter +} + func (r *standardRegistry) ServiceReqsCounter() metrics.Counter { return r.serviceReqsCounter } @@ -283,6 +341,14 @@ func (r *standardRegistry) ServiceServerUpGauge() metrics.Gauge { return r.serviceServerUpGauge } +func (r *standardRegistry) ServiceReqsBytesCounter() metrics.Counter { + return r.serviceReqsBytesCounter +} + +func (r *standardRegistry) ServiceRespsBytesCounter() metrics.Counter { + return r.serviceRespsBytesCounter +} + // ScalableHistogram is a Histogram with a predefined time unit, // used when producing observations without explicitly setting the observed value. type ScalableHistogram interface { @@ -335,11 +401,6 @@ func NewHistogramWithScale(histogram metrics.Histogram, unit time.Duration) (Sca // MultiHistogram collects multiple individual histograms and treats them as a unit. type MultiHistogram []ScalableHistogram -// NewMultiHistogram returns a multi-histogram, wrapping the passed histograms. -func NewMultiHistogram(h ...ScalableHistogram) MultiHistogram { - return MultiHistogram(h) -} - // ObserveFromStart implements ScalableHistogram. func (h MultiHistogram) ObserveFromStart(start time.Time) { for _, histogram := range h { diff --git a/pkg/metrics/prometheus.go b/pkg/metrics/prometheus.go index 7f2f6114e..324864087 100644 --- a/pkg/metrics/prometheus.go +++ b/pkg/metrics/prometheus.go @@ -32,27 +32,33 @@ const ( tlsCertsNotAfterTimestamp = metricsTLSPrefix + "certs_not_after" // entry point. - metricEntryPointPrefix = MetricNamePrefix + "entrypoint_" - entryPointReqsTotalName = metricEntryPointPrefix + "requests_total" - entryPointReqsTLSTotalName = metricEntryPointPrefix + "requests_tls_total" - entryPointReqDurationName = metricEntryPointPrefix + "request_duration_seconds" - entryPointOpenConnsName = metricEntryPointPrefix + "open_connections" + metricEntryPointPrefix = MetricNamePrefix + "entrypoint_" + entryPointReqsTotalName = metricEntryPointPrefix + "requests_total" + entryPointReqsTLSTotalName = metricEntryPointPrefix + "requests_tls_total" + entryPointReqDurationName = metricEntryPointPrefix + "request_duration_seconds" + entryPointOpenConnsName = metricEntryPointPrefix + "open_connections" + entryPointReqsBytesTotalName = metricEntryPointPrefix + "requests_bytes_total" + entryPointRespsBytesTotalName = metricEntryPointPrefix + "responses_bytes_total" // router level. - metricRouterPrefix = MetricNamePrefix + "router_" - routerReqsTotalName = metricRouterPrefix + "requests_total" - routerReqsTLSTotalName = metricRouterPrefix + "requests_tls_total" - routerReqDurationName = metricRouterPrefix + "request_duration_seconds" - routerOpenConnsName = metricRouterPrefix + "open_connections" + metricRouterPrefix = MetricNamePrefix + "router_" + routerReqsTotalName = metricRouterPrefix + "requests_total" + routerReqsTLSTotalName = metricRouterPrefix + "requests_tls_total" + routerReqDurationName = metricRouterPrefix + "request_duration_seconds" + routerOpenConnsName = metricRouterPrefix + "open_connections" + routerReqsBytesTotalName = metricRouterPrefix + "requests_bytes_total" + routerRespsBytesTotalName = metricRouterPrefix + "responses_bytes_total" // service level. - metricServicePrefix = MetricNamePrefix + "service_" - serviceReqsTotalName = metricServicePrefix + "requests_total" - serviceReqsTLSTotalName = metricServicePrefix + "requests_tls_total" - serviceReqDurationName = metricServicePrefix + "request_duration_seconds" - serviceOpenConnsName = metricServicePrefix + "open_connections" - serviceRetriesTotalName = metricServicePrefix + "retries_total" - serviceServerUpName = metricServicePrefix + "server_up" + metricServicePrefix = MetricNamePrefix + "service_" + serviceReqsTotalName = metricServicePrefix + "requests_total" + serviceReqsTLSTotalName = metricServicePrefix + "requests_tls_total" + serviceReqDurationName = metricServicePrefix + "request_duration_seconds" + serviceOpenConnsName = metricServicePrefix + "open_connections" + serviceRetriesTotalName = metricServicePrefix + "retries_total" + serviceServerUpName = metricServicePrefix + "server_up" + serviceReqsBytesTotalName = metricServicePrefix + "requests_bytes_total" + serviceRespsBytesTotalName = metricServicePrefix + "responses_bytes_total" ) // promState holds all metric state internally and acts as the only Collector we register for Prometheus. @@ -166,18 +172,30 @@ func initStandardRegistry(config *types.Prometheus) Registry { Name: entryPointOpenConnsName, Help: "How many open connections exist on an entrypoint, partitioned by method and protocol.", }, []string{"method", "protocol", "entrypoint"}) + entryPointReqsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{ + Name: entryPointReqsBytesTotalName, + Help: "The total size of requests in bytes handled by an entrypoint, partitioned by status code, protocol, and method.", + }, []string{"code", "method", "protocol", "entrypoint"}) + entryPointRespsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{ + Name: entryPointRespsBytesTotalName, + Help: "The total size of responses in bytes handled by an entrypoint, partitioned by status code, protocol, and method.", + }, []string{"code", "method", "protocol", "entrypoint"}) promState.vectors = append(promState.vectors, entryPointReqs.cv, entryPointReqsTLS.cv, entryPointReqDurations.hv, entryPointOpenConns.gv, + entryPointReqsBytesTotal.cv, + entryPointRespsBytesTotal.cv, ) reg.entryPointReqsCounter = entryPointReqs reg.entryPointReqsTLSCounter = entryPointReqsTLS reg.entryPointReqDurationHistogram, _ = NewHistogramWithScale(entryPointReqDurations, time.Second) reg.entryPointOpenConnsGauge = entryPointOpenConns + reg.entryPointReqsBytesCounter = entryPointReqsBytesTotal + reg.entryPointRespsBytesCounter = entryPointRespsBytesTotal } if config.AddRoutersLabels { @@ -198,17 +216,29 @@ func initStandardRegistry(config *types.Prometheus) Registry { Name: routerOpenConnsName, Help: "How many open connections exist on a router, partitioned by service, method, and protocol.", }, []string{"method", "protocol", "router", "service"}) + routerReqsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{ + Name: routerReqsBytesTotalName, + Help: "The total size of requests in bytes handled by a router, partitioned by service, status code, protocol, and method.", + }, []string{"code", "method", "protocol", "router", "service"}) + routerRespsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{ + Name: routerRespsBytesTotalName, + Help: "The total size of responses in bytes handled by a router, partitioned by service, status code, protocol, and method.", + }, []string{"code", "method", "protocol", "router", "service"}) promState.vectors = append(promState.vectors, routerReqs.cv, routerReqsTLS.cv, routerReqDurations.hv, routerOpenConns.gv, + routerReqsBytesTotal.cv, + routerRespsBytesTotal.cv, ) reg.routerReqsCounter = routerReqs reg.routerReqsTLSCounter = routerReqsTLS reg.routerReqDurationHistogram, _ = NewHistogramWithScale(routerReqDurations, time.Second) reg.routerOpenConnsGauge = routerOpenConns + reg.routerReqsBytesCounter = routerReqsBytesTotal + reg.routerRespsBytesCounter = routerRespsBytesTotal } if config.AddServicesLabels { @@ -237,6 +267,14 @@ func initStandardRegistry(config *types.Prometheus) Registry { Name: serviceServerUpName, Help: "service server is up, described by gauge value of 0 or 1.", }, []string{"service", "url"}) + serviceReqsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{ + Name: serviceReqsBytesTotalName, + Help: "The total size of requests in bytes received by a service, partitioned by status code, protocol, and method.", + }, []string{"code", "method", "protocol", "service"}) + serviceRespsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{ + Name: serviceRespsBytesTotalName, + Help: "The total size of responses in bytes returned by a service, partitioned by status code, protocol, and method.", + }, []string{"code", "method", "protocol", "service"}) promState.vectors = append(promState.vectors, serviceReqs.cv, @@ -245,6 +283,8 @@ func initStandardRegistry(config *types.Prometheus) Registry { serviceOpenConns.gv, serviceRetries.cv, serviceServerUp.gv, + serviceReqsBytesTotal.cv, + serviceRespsBytesTotal.cv, ) reg.serviceReqsCounter = serviceReqs @@ -253,6 +293,8 @@ func initStandardRegistry(config *types.Prometheus) Registry { reg.serviceOpenConnsGauge = serviceOpenConns reg.serviceRetriesCounter = serviceRetries reg.serviceServerUpGauge = serviceServerUp + reg.serviceReqsBytesCounter = serviceReqsBytesTotal + reg.serviceRespsBytesCounter = serviceRespsBytesTotal } return reg diff --git a/pkg/metrics/prometheus_test.go b/pkg/metrics/prometheus_test.go index 30627c3bd..744a7615d 100644 --- a/pkg/metrics/prometheus_test.go +++ b/pkg/metrics/prometheus_test.go @@ -121,6 +121,14 @@ func TestPrometheus(t *testing.T) { EntryPointOpenConnsGauge(). With("method", http.MethodGet, "protocol", "http", "entrypoint", "http"). Set(1) + prometheusRegistry. + EntryPointRespsBytesCounter(). + With("code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http", "entrypoint", "http"). + Add(1) + prometheusRegistry. + EntryPointReqsBytesCounter(). + With("code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http", "entrypoint", "http"). + Add(1) prometheusRegistry. RouterReqsCounter(). @@ -138,6 +146,14 @@ func TestPrometheus(t *testing.T) { RouterOpenConnsGauge(). With("router", "demo", "service", "service1", "method", http.MethodGet, "protocol", "http"). Set(1) + prometheusRegistry. + RouterRespsBytesCounter(). + With("router", "demo", "service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http"). + Add(1) + prometheusRegistry. + RouterReqsBytesCounter(). + With("router", "demo", "service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http"). + Add(1) prometheusRegistry. ServiceReqsCounter(). @@ -163,6 +179,14 @@ func TestPrometheus(t *testing.T) { ServiceServerUpGauge(). With("service", "service1", "url", "http://127.0.0.10:80"). Set(1) + prometheusRegistry. + ServiceRespsBytesCounter(). + With("service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http"). + Add(1) + prometheusRegistry. + ServiceReqsBytesCounter(). + With("service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http"). + Add(1) delayForTrackingCompletion() @@ -227,6 +251,26 @@ func TestPrometheus(t *testing.T) { }, assert: buildGaugeAssert(t, entryPointOpenConnsName, 1), }, + { + name: entryPointReqsBytesTotalName, + labels: map[string]string{ + "code": "200", + "method": http.MethodGet, + "protocol": "http", + "entrypoint": "http", + }, + assert: buildCounterAssert(t, entryPointReqsBytesTotalName, 1), + }, + { + name: entryPointRespsBytesTotalName, + labels: map[string]string{ + "code": "200", + "method": http.MethodGet, + "protocol": "http", + "entrypoint": "http", + }, + assert: buildCounterAssert(t, entryPointRespsBytesTotalName, 1), + }, { name: routerReqsTotalName, labels: map[string]string{ @@ -269,6 +313,28 @@ func TestPrometheus(t *testing.T) { }, assert: buildGaugeAssert(t, routerOpenConnsName, 1), }, + { + name: routerReqsBytesTotalName, + labels: map[string]string{ + "code": "200", + "method": http.MethodGet, + "protocol": "http", + "service": "service1", + "router": "demo", + }, + assert: buildCounterAssert(t, routerReqsBytesTotalName, 1), + }, + { + name: routerRespsBytesTotalName, + labels: map[string]string{ + "code": "200", + "method": http.MethodGet, + "protocol": "http", + "service": "service1", + "router": "demo", + }, + assert: buildCounterAssert(t, routerRespsBytesTotalName, 1), + }, { name: serviceReqsTotalName, labels: map[string]string{ @@ -322,6 +388,26 @@ func TestPrometheus(t *testing.T) { }, assert: buildGaugeAssert(t, serviceServerUpName, 1), }, + { + name: serviceReqsBytesTotalName, + labels: map[string]string{ + "code": "200", + "method": http.MethodGet, + "protocol": "http", + "service": "service1", + }, + assert: buildCounterAssert(t, serviceReqsBytesTotalName, 1), + }, + { + name: serviceRespsBytesTotalName, + labels: map[string]string{ + "code": "200", + "method": http.MethodGet, + "protocol": "http", + "service": "service1", + }, + assert: buildCounterAssert(t, serviceRespsBytesTotalName, 1), + }, } for _, test := range testCases { diff --git a/pkg/metrics/statsd.go b/pkg/metrics/statsd.go index 90b356876..c84a7cf5a 100644 --- a/pkg/metrics/statsd.go +++ b/pkg/metrics/statsd.go @@ -28,11 +28,15 @@ const ( statsdEntryPointReqsTLSName = "entrypoint.request.tls.total" statsdEntryPointReqDurationName = "entrypoint.request.duration" statsdEntryPointOpenConnsName = "entrypoint.connections.open" + statsdEntryPointReqsBytesName = "entrypoint.requests.bytes.total" + statsdEntryPointRespsBytesName = "entrypoint.responses.bytes.total" statsdRouterReqsName = "router.request.total" statsdRouterReqsTLSName = "router.request.tls.total" statsdRouterReqsDurationName = "router.request.duration" statsdRouterOpenConnsName = "router.connections.open" + statsdRouterReqsBytesName = "router.requests.bytes.total" + statsdRouterRespsBytesName = "router.responses.bytes.total" statsdServiceReqsName = "service.request.total" statsdServiceReqsTLSName = "service.request.tls.total" @@ -40,6 +44,8 @@ const ( statsdServiceRetriesTotalName = "service.retries.total" statsdServiceServerUpName = "service.server.up" statsdServiceOpenConnsName = "service.connections.open" + statsdServiceReqsBytesName = "service.requests.bytes.total" + statsdServiceRespsBytesName = "service.responses.bytes.total" ) // RegisterStatsd registers the metrics pusher if this didn't happen yet and creates a statsd Registry instance. @@ -72,6 +78,8 @@ func RegisterStatsd(ctx context.Context, config *types.Statsd) Registry { registry.entryPointReqsTLSCounter = statsdClient.NewCounter(statsdEntryPointReqsTLSName, 1.0) registry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(statsdClient.NewTiming(statsdEntryPointReqDurationName, 1.0), time.Millisecond) registry.entryPointOpenConnsGauge = statsdClient.NewGauge(statsdEntryPointOpenConnsName) + registry.entryPointReqsBytesCounter = statsdClient.NewCounter(statsdEntryPointReqsBytesName, 1.0) + registry.entryPointRespsBytesCounter = statsdClient.NewCounter(statsdEntryPointRespsBytesName, 1.0) } if config.AddRoutersLabels { @@ -80,6 +88,8 @@ func RegisterStatsd(ctx context.Context, config *types.Statsd) Registry { registry.routerReqsTLSCounter = statsdClient.NewCounter(statsdRouterReqsTLSName, 1.0) registry.routerReqDurationHistogram, _ = NewHistogramWithScale(statsdClient.NewTiming(statsdRouterReqsDurationName, 1.0), time.Millisecond) registry.routerOpenConnsGauge = statsdClient.NewGauge(statsdRouterOpenConnsName) + registry.routerReqsBytesCounter = statsdClient.NewCounter(statsdRouterReqsBytesName, 1.0) + registry.routerRespsBytesCounter = statsdClient.NewCounter(statsdRouterRespsBytesName, 1.0) } if config.AddServicesLabels { @@ -90,6 +100,8 @@ func RegisterStatsd(ctx context.Context, config *types.Statsd) Registry { registry.serviceRetriesCounter = statsdClient.NewCounter(statsdServiceRetriesTotalName, 1.0) registry.serviceOpenConnsGauge = statsdClient.NewGauge(statsdServiceOpenConnsName) registry.serviceServerUpGauge = statsdClient.NewGauge(statsdServiceServerUpName) + registry.serviceReqsBytesCounter = statsdClient.NewCounter(statsdServiceReqsBytesName, 1.0) + registry.serviceRespsBytesCounter = statsdClient.NewCounter(statsdServiceRespsBytesName, 1.0) } return registry diff --git a/pkg/metrics/statsd_test.go b/pkg/metrics/statsd_test.go index 63de0abf6..3eb8e4705 100644 --- a/pkg/metrics/statsd_test.go +++ b/pkg/metrics/statsd_test.go @@ -59,11 +59,15 @@ func testRegistry(t *testing.T, metricsPrefix string, registry Registry) { metricsPrefix + ".entrypoint.request.tls.total:1.000000|c\n", metricsPrefix + ".entrypoint.request.duration:10000.000000|ms", metricsPrefix + ".entrypoint.connections.open:1.000000|g\n", + metricsPrefix + ".entrypoint.requests.bytes.total:1.000000|c\n", + metricsPrefix + ".entrypoint.responses.bytes.total:1.000000|c\n", metricsPrefix + ".router.request.total:2.000000|c\n", metricsPrefix + ".router.request.tls.total:1.000000|c\n", metricsPrefix + ".router.request.duration:10000.000000|ms", metricsPrefix + ".router.connections.open:1.000000|g\n", + metricsPrefix + ".router.requests.bytes.total:1.000000|c\n", + metricsPrefix + ".router.responses.bytes.total:1.000000|c\n", metricsPrefix + ".service.request.total:2.000000|c\n", metricsPrefix + ".service.request.tls.total:1.000000|c\n", @@ -71,6 +75,8 @@ func testRegistry(t *testing.T, metricsPrefix string, registry Registry) { metricsPrefix + ".service.connections.open:1.000000|g\n", metricsPrefix + ".service.retries.total:2.000000|c\n", metricsPrefix + ".service.server.up:1.000000|g\n", + metricsPrefix + ".service.requests.bytes.total:1.000000|c\n", + metricsPrefix + ".service.responses.bytes.total:1.000000|c\n", } udp.ShouldReceiveAll(t, expected, func() { @@ -85,12 +91,16 @@ func testRegistry(t *testing.T, metricsPrefix string, registry Registry) { registry.EntryPointReqsTLSCounter().With("entrypoint", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1) registry.EntryPointReqDurationHistogram().With("entrypoint", "test").Observe(10000) registry.EntryPointOpenConnsGauge().With("entrypoint", "test").Set(1) + registry.EntryPointReqsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + registry.EntryPointRespsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) registry.RouterReqsCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1) registry.RouterReqsCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) registry.RouterReqsTLSCounter().With("router", "demo", "service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1) registry.RouterReqDurationHistogram().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000) registry.RouterOpenConnsGauge().With("router", "demo", "service", "test").Set(1) + registry.RouterReqsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + registry.RouterRespsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) registry.ServiceReqsCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) registry.ServiceReqsCounter().With("service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1) @@ -100,5 +110,7 @@ func testRegistry(t *testing.T, metricsPrefix string, registry Registry) { registry.ServiceRetriesCounter().With("service", "test").Add(1) registry.ServiceRetriesCounter().With("service", "test").Add(1) registry.ServiceServerUpGauge().With("service:test", "url", "http://127.0.0.1").Set(1) + registry.ServiceReqsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + registry.ServiceRespsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) }) } diff --git a/pkg/middlewares/accesslog/capture_request_reader.go b/pkg/middlewares/accesslog/capture_request_reader.go deleted file mode 100644 index 7ff090667..000000000 --- a/pkg/middlewares/accesslog/capture_request_reader.go +++ /dev/null @@ -1,20 +0,0 @@ -package accesslog - -import "io" - -type captureRequestReader struct { - // source ReadCloser from where the request body is read. - source io.ReadCloser - // count Counts the number of bytes read (when captureRequestReader.Read is called). - count int64 -} - -func (r *captureRequestReader) Read(p []byte) (int, error) { - n, err := r.source.Read(p) - r.count += int64(n) - return n, err -} - -func (r *captureRequestReader) Close() error { - return r.source.Close() -} diff --git a/pkg/middlewares/accesslog/capture_response_writer.go b/pkg/middlewares/accesslog/capture_response_writer.go deleted file mode 100644 index d8b0c6fdb..000000000 --- a/pkg/middlewares/accesslog/capture_response_writer.go +++ /dev/null @@ -1,83 +0,0 @@ -package accesslog - -import ( - "bufio" - "fmt" - "net" - "net/http" - - "github.com/traefik/traefik/v2/pkg/middlewares" -) - -var _ middlewares.Stateful = &captureResponseWriterWithCloseNotify{} - -type capturer interface { - http.ResponseWriter - Size() int64 - Status() int -} - -func newCaptureResponseWriter(rw http.ResponseWriter) capturer { - capt := &captureResponseWriter{rw: rw} - if _, ok := rw.(http.CloseNotifier); !ok { - return capt - } - return &captureResponseWriterWithCloseNotify{capt} -} - -// captureResponseWriter is a wrapper of type http.ResponseWriter -// that tracks request status and size. -type captureResponseWriter struct { - rw http.ResponseWriter - status int - size int64 -} - -type captureResponseWriterWithCloseNotify struct { - *captureResponseWriter -} - -// CloseNotify returns a channel that receives at most a -// single value (true) when the client connection has gone away. -func (r *captureResponseWriterWithCloseNotify) CloseNotify() <-chan bool { - return r.rw.(http.CloseNotifier).CloseNotify() -} - -func (crw *captureResponseWriter) Header() http.Header { - return crw.rw.Header() -} - -func (crw *captureResponseWriter) Write(b []byte) (int, error) { - if crw.status == 0 { - crw.status = http.StatusOK - } - size, err := crw.rw.Write(b) - crw.size += int64(size) - return size, err -} - -func (crw *captureResponseWriter) WriteHeader(s int) { - crw.rw.WriteHeader(s) - crw.status = s -} - -func (crw *captureResponseWriter) Flush() { - if f, ok := crw.rw.(http.Flusher); ok { - f.Flush() - } -} - -func (crw *captureResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { - if h, ok := crw.rw.(http.Hijacker); ok { - return h.Hijack() - } - return nil, nil, fmt.Errorf("not a hijacker: %T", crw.rw) -} - -func (crw *captureResponseWriter) Status() int { - return crw.status -} - -func (crw *captureResponseWriter) Size() int64 { - return crw.size -} diff --git a/pkg/middlewares/accesslog/capture_response_writer_test.go b/pkg/middlewares/accesslog/capture_response_writer_test.go deleted file mode 100644 index 3606fc033..000000000 --- a/pkg/middlewares/accesslog/capture_response_writer_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package accesslog - -import ( - "net/http" - "net/http/httptest" - "testing" - - "github.com/stretchr/testify/assert" -) - -type rwWithCloseNotify struct { - *httptest.ResponseRecorder -} - -func (r *rwWithCloseNotify) CloseNotify() <-chan bool { - panic("implement me") -} - -func TestCloseNotifier(t *testing.T) { - testCases := []struct { - rw http.ResponseWriter - desc string - implementsCloseNotifier bool - }{ - { - rw: httptest.NewRecorder(), - desc: "does not implement CloseNotifier", - implementsCloseNotifier: false, - }, - { - rw: &rwWithCloseNotify{httptest.NewRecorder()}, - desc: "implements CloseNotifier", - implementsCloseNotifier: true, - }, - } - - for _, test := range testCases { - test := test - t.Run(test.desc, func(t *testing.T) { - t.Parallel() - - _, ok := test.rw.(http.CloseNotifier) - assert.Equal(t, test.implementsCloseNotifier, ok) - - rw := newCaptureResponseWriter(test.rw) - _, impl := rw.(http.CloseNotifier) - assert.Equal(t, test.implementsCloseNotifier, impl) - }) - } -} diff --git a/pkg/middlewares/accesslog/field_middleware.go b/pkg/middlewares/accesslog/field_middleware.go index c65dcc052..c4182ab78 100644 --- a/pkg/middlewares/accesslog/field_middleware.go +++ b/pkg/middlewares/accesslog/field_middleware.go @@ -4,6 +4,8 @@ import ( "net/http" "time" + "github.com/traefik/traefik/v2/pkg/log" + "github.com/traefik/traefik/v2/pkg/middlewares/capture" "github.com/vulcand/oxy/utils" ) @@ -49,16 +51,24 @@ func AddServiceFields(rw http.ResponseWriter, req *http.Request, next http.Handl // AddOriginFields add origin fields. func AddOriginFields(rw http.ResponseWriter, req *http.Request, next http.Handler, data *LogData) { - crw := newCaptureResponseWriter(rw) start := time.Now().UTC() - next.ServeHTTP(crw, req) + next.ServeHTTP(rw, req) // use UTC to handle switchover of daylight saving correctly data.Core[OriginDuration] = time.Now().UTC().Sub(start) - data.Core[OriginStatus] = crw.Status() - // make copy of headers so we can ensure there is no subsequent mutation during response processing + // make copy of headers, so we can ensure there is no subsequent mutation + // during response processing data.OriginResponse = make(http.Header) - utils.CopyHeaders(data.OriginResponse, crw.Header()) - data.Core[OriginContentSize] = crw.Size() + utils.CopyHeaders(data.OriginResponse, rw.Header()) + + ctx := req.Context() + capt, err := capture.FromContext(ctx) + if err != nil { + log.FromContext(log.With(ctx, log.Str(log.MiddlewareType, "AccessLogs"))).Errorf("Could not get Capture: %v", err) + return + } + + data.Core[OriginStatus] = capt.StatusCode() + data.Core[OriginContentSize] = capt.ResponseSize() } diff --git a/pkg/middlewares/accesslog/logger.go b/pkg/middlewares/accesslog/logger.go index a6cfea67d..841728582 100644 --- a/pkg/middlewares/accesslog/logger.go +++ b/pkg/middlewares/accesslog/logger.go @@ -19,6 +19,7 @@ import ( "github.com/sirupsen/logrus" ptypes "github.com/traefik/paerser/types" "github.com/traefik/traefik/v2/pkg/log" + "github.com/traefik/traefik/v2/pkg/middlewares/capture" traefiktls "github.com/traefik/traefik/v2/pkg/tls" "github.com/traefik/traefik/v2/pkg/types" ) @@ -182,13 +183,17 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http }, } - reqWithDataTable := req.WithContext(context.WithValue(req.Context(), DataTableKey, logDataTable)) + defer func() { + if h.config.BufferingSize > 0 { + h.logHandlerChan <- handlerParams{ + logDataTable: logDataTable, + } + return + } + h.logTheRoundTrip(logDataTable) + }() - var crr *captureRequestReader - if req.Body != nil { - crr = &captureRequestReader{source: req.Body, count: 0} - reqWithDataTable.Body = crr - } + reqWithDataTable := req.WithContext(context.WithValue(req.Context(), DataTableKey, logDataTable)) core[RequestCount] = nextRequestCount() if req.Host != "" { @@ -222,30 +227,26 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http core[ClientHost] = forwardedFor } - crw := newCaptureResponseWriter(rw) - - next.ServeHTTP(crw, reqWithDataTable) + next.ServeHTTP(rw, reqWithDataTable) if _, ok := core[ClientUsername]; !ok { core[ClientUsername] = usernameIfPresent(reqWithDataTable.URL) } logDataTable.DownstreamResponse = downstreamResponse{ - headers: crw.Header().Clone(), - status: crw.Status(), - size: crw.Size(), - } - if crr != nil { - logDataTable.Request.size = crr.count + headers: rw.Header().Clone(), } - if h.config.BufferingSize > 0 { - h.logHandlerChan <- handlerParams{ - logDataTable: logDataTable, - } - } else { - h.logTheRoundTrip(logDataTable) + ctx := req.Context() + capt, err := capture.FromContext(ctx) + if err != nil { + log.FromContext(log.With(ctx, log.Str(log.MiddlewareType, "AccessLogs"))).Errorf("Could not get Capture: %v", err) + return } + + logDataTable.DownstreamResponse.status = capt.StatusCode() + logDataTable.DownstreamResponse.size = capt.ResponseSize() + logDataTable.Request.size = capt.RequestSize() } // Close closes the Logger (i.e. the file, drain logHandlerChan, etc). diff --git a/pkg/middlewares/accesslog/logger_test.go b/pkg/middlewares/accesslog/logger_test.go index 751380018..b7b3b565e 100644 --- a/pkg/middlewares/accesslog/logger_test.go +++ b/pkg/middlewares/accesslog/logger_test.go @@ -1,9 +1,11 @@ package accesslog import ( + "bytes" "crypto/tls" "encoding/json" "fmt" + "io" "net/http" "net/http/httptest" "net/url" @@ -14,9 +16,11 @@ import ( "testing" "time" + "github.com/containous/alice" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ptypes "github.com/traefik/paerser/types" + "github.com/traefik/traefik/v2/pkg/middlewares/capture" "github.com/traefik/traefik/v2/pkg/types" ) @@ -46,23 +50,29 @@ func TestLogRotation(t *testing.T) { config := &types.AccessLog{FilePath: fileName, Format: CommonFormat} logHandler, err := NewHandler(config) - if err != nil { - t.Fatalf("Error creating new log handler: %s", err) - } - defer logHandler.Close() + require.NoError(t, err) + t.Cleanup(func() { + err := logHandler.Close() + require.NoError(t, err) + }) + + chain := alice.New() + chain = chain.Append(capture.WrapHandler(&capture.Handler{})) + chain = chain.Append(WrapHandler(logHandler)) + handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(http.StatusOK) + })) + require.NoError(t, err) recorder := httptest.NewRecorder() req := httptest.NewRequest(http.MethodGet, "http://localhost", nil) - next := func(rw http.ResponseWriter, req *http.Request) { - rw.WriteHeader(http.StatusOK) - } iterations := 20 halfDone := make(chan bool) writeDone := make(chan bool) go func() { for i := 0; i < iterations; i++ { - logHandler.ServeHTTP(recorder, req, http.HandlerFunc(next)) + handler.ServeHTTP(recorder, req) if i == iterations/2 { halfDone <- true } @@ -178,7 +188,10 @@ func TestLoggerHeaderFields(t *testing.T) { logger, err := NewHandler(config) require.NoError(t, err) - defer logger.Close() + t.Cleanup(func() { + err := logger.Close() + require.NoError(t, err) + }) if config.FilePath != "" { _, err = os.Stat(config.FilePath) @@ -196,9 +209,14 @@ func TestLoggerHeaderFields(t *testing.T) { req.Header.Add(test.header, s) } - logger.ServeHTTP(httptest.NewRecorder(), req, http.HandlerFunc(func(writer http.ResponseWriter, r *http.Request) { - writer.WriteHeader(http.StatusOK) + chain := alice.New() + chain = chain.Append(capture.WrapHandler(&capture.Handler{})) + chain = chain.Append(WrapHandler(logger)) + handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(http.StatusOK) })) + require.NoError(t, err) + handler.ServeHTTP(httptest.NewRecorder(), req) logData, err := os.ReadFile(logFile.Name()) require.NoError(t, err) @@ -224,11 +242,14 @@ func TestLoggerCLF(t *testing.T) { assertValidLogData(t, expectedLog, logData) } -func TestAsyncLoggerCLF(t *testing.T) { +func TestLoggerCLFWithBufferingSize(t *testing.T) { logFilePath := filepath.Join(t.TempDir(), logFileNameSuffix) config := &types.AccessLog{FilePath: logFilePath, Format: CommonFormat, BufferingSize: 1024} doLogging(t, config) + // wait a bit for the buffer to be written in the file. + time.Sleep(50 * time.Millisecond) + logData, err := os.ReadFile(logFilePath) require.NoError(t, err) @@ -691,9 +712,7 @@ func assertValidLogData(t *testing.T, expected string, logData []byte) { resultExpected, err := ParseAccessLog(expected) require.NoError(t, err) - formatErrMessage := fmt.Sprintf(` - Expected: %s - Actual: %s`, expected, string(logData)) + formatErrMessage := fmt.Sprintf("Expected:\t%q\nActual:\t%q", expected, string(logData)) require.Equal(t, len(resultExpected), len(result), formatErrMessage) assert.Equal(t, resultExpected[ClientHost], result[ClientHost], formatErrMessage) @@ -722,7 +741,7 @@ func captureStdout(t *testing.T) (out *os.File, restoreStdout func()) { restoreStdout = func() { os.Stdout = original - os.RemoveAll(file.Name()) + _ = os.RemoveAll(file.Name()) } return file, restoreStdout @@ -730,10 +749,12 @@ func captureStdout(t *testing.T) (out *os.File, restoreStdout func()) { func doLoggingTLSOpt(t *testing.T, config *types.AccessLog, enableTLS bool) { t.Helper() - logger, err := NewHandler(config) require.NoError(t, err) - defer logger.Close() + t.Cleanup(func() { + err := logger.Close() + require.NoError(t, err) + }) if config.FilePath != "" { _, err = os.Stat(config.FilePath) @@ -753,6 +774,7 @@ func doLoggingTLSOpt(t *testing.T, config *types.AccessLog, enableTLS bool) { User: url.UserPassword(testUsername, ""), Path: testPath, }, + Body: io.NopCloser(bytes.NewReader([]byte("bar"))), } if enableTLS { req.TLS = &tls.ConnectionState{ @@ -761,7 +783,13 @@ func doLoggingTLSOpt(t *testing.T, config *types.AccessLog, enableTLS bool) { } } - logger.ServeHTTP(httptest.NewRecorder(), req, http.HandlerFunc(logWriterTestHandlerFunc)) + chain := alice.New() + chain = chain.Append(capture.WrapHandler(&capture.Handler{})) + chain = chain.Append(WrapHandler(logger)) + handler, err := chain.Then(http.HandlerFunc(logWriterTestHandlerFunc)) + require.NoError(t, err) + + handler.ServeHTTP(httptest.NewRecorder(), req) } func doLoggingTLS(t *testing.T, config *types.AccessLog) { diff --git a/pkg/middlewares/capture/capture.go b/pkg/middlewares/capture/capture.go new file mode 100644 index 000000000..6ee0ed399 --- /dev/null +++ b/pkg/middlewares/capture/capture.go @@ -0,0 +1,198 @@ +// Package capture is a middleware that captures requests/responses size, and status. +// +// For another middleware to get those attributes of a request/response, this middleware +// should be added before in the middleware chain. +// +// handler, _ := NewHandler() +// chain := alice.New(). +// Append(WrapHandler(handler)). +// Append(myOtherMiddleware). +// then(...) +// +// As this middleware stores those data in the request's context, the data can +// be retrieved at anytime after the ServerHTTP. +// +// func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http.Handler) { +// capt, err := capture.FromContext(req.Context()) +// if err != nil { +// ... +// } +// +// fmt.Println(capt.Status()) +// fmt.Println(capt.ResponseSize()) +// fmt.Println(capt.RequestSize()) +// } +package capture + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + + "github.com/containous/alice" + "github.com/traefik/traefik/v2/pkg/middlewares" +) + +type key string + +const capturedData key = "capturedData" + +// Handler will store each request data to its context. +type Handler struct{} + +// WrapHandler wraps capture handler into an Alice Constructor. +func WrapHandler(handler *Handler) alice.Constructor { + return func(next http.Handler) (http.Handler, error) { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + handler.ServeHTTP(rw, req, next) + }), nil + } +} + +func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http.Handler) { + c := Capture{} + if req.Body != nil { + readCounter := &readCounter{source: req.Body} + c.rr = readCounter + req.Body = readCounter + } + responseWriter := newResponseWriter(rw) + c.rw = responseWriter + ctx := context.WithValue(req.Context(), capturedData, &c) + next.ServeHTTP(responseWriter, req.WithContext(ctx)) +} + +// Capture is the object populated by the capture middleware, +// allowing to gather information about the request and response. +type Capture struct { + rr *readCounter + rw responseWriter +} + +// FromContext returns the Capture value found in ctx, or an empty Capture otherwise. +func FromContext(ctx context.Context) (*Capture, error) { + c := ctx.Value(capturedData) + if c == nil { + return nil, errors.New("value not found") + } + capt, ok := c.(*Capture) + if !ok { + return nil, errors.New("value stored in Context is not a *Capture") + } + return capt, nil +} + +func (c Capture) ResponseSize() int64 { + return c.rw.Size() +} + +func (c Capture) StatusCode() int { + return c.rw.Status() +} + +// RequestSize returns the size of the request's body if it applies, +// zero otherwise. +func (c Capture) RequestSize() int64 { + if c.rr == nil { + return 0 + } + return c.rr.size +} + +type readCounter struct { + // source ReadCloser from where the request body is read. + source io.ReadCloser + // size is total the number of bytes read. + size int64 +} + +func (r *readCounter) Read(p []byte) (int, error) { + n, err := r.source.Read(p) + r.size += int64(n) + return n, err +} + +func (r *readCounter) Close() error { + return r.source.Close() +} + +var _ middlewares.Stateful = &responseWriterWithCloseNotify{} + +type responseWriter interface { + http.ResponseWriter + Size() int64 + Status() int +} + +func newResponseWriter(rw http.ResponseWriter) responseWriter { + capt := &captureResponseWriter{rw: rw} + if _, ok := rw.(http.CloseNotifier); !ok { + return capt + } + + return &responseWriterWithCloseNotify{capt} +} + +// captureResponseWriter is a wrapper of type http.ResponseWriter +// that tracks response status and size. +type captureResponseWriter struct { + rw http.ResponseWriter + status int + size int64 +} + +func (crw *captureResponseWriter) Header() http.Header { + return crw.rw.Header() +} + +func (crw *captureResponseWriter) Size() int64 { + return crw.size +} + +func (crw *captureResponseWriter) Status() int { + return crw.status +} + +func (crw *captureResponseWriter) Write(b []byte) (int, error) { + if crw.status == 0 { + crw.status = http.StatusOK + } + + size, err := crw.rw.Write(b) + crw.size += int64(size) + + return size, err +} + +func (crw *captureResponseWriter) WriteHeader(s int) { + crw.rw.WriteHeader(s) + crw.status = s +} + +func (crw *captureResponseWriter) Flush() { + if f, ok := crw.rw.(http.Flusher); ok { + f.Flush() + } +} + +func (crw *captureResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { + if h, ok := crw.rw.(http.Hijacker); ok { + return h.Hijack() + } + + return nil, nil, fmt.Errorf("not a hijacker: %T", crw.rw) +} + +type responseWriterWithCloseNotify struct { + *captureResponseWriter +} + +// CloseNotify returns a channel that receives at most a +// single value (true) when the client connection has gone away. +func (r *responseWriterWithCloseNotify) CloseNotify() <-chan bool { + return r.rw.(http.CloseNotifier).CloseNotify() +} diff --git a/pkg/middlewares/capture/capture_test.go b/pkg/middlewares/capture/capture_test.go new file mode 100644 index 000000000..f6718ad6e --- /dev/null +++ b/pkg/middlewares/capture/capture_test.go @@ -0,0 +1,234 @@ +package capture + +import ( + "bytes" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/containous/alice" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCapture(t *testing.T) { + wrapMiddleware := func(next http.Handler) (http.Handler, error) { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + capt, err := FromContext(req.Context()) + require.NoError(t, err) + + _, err = fmt.Fprintf(rw, "%d,%d,%d,", capt.RequestSize(), capt.ResponseSize(), capt.StatusCode()) + require.NoError(t, err) + + next.ServeHTTP(rw, req) + + _, err = fmt.Fprintf(rw, ",%d,%d,%d", capt.RequestSize(), capt.ResponseSize(), capt.StatusCode()) + require.NoError(t, err) + }), nil + } + + handler := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + _, err := rw.Write([]byte("foo")) + require.NoError(t, err) + + all, err := io.ReadAll(req.Body) + require.NoError(t, err) + assert.Equal(t, "bar", string(all)) + }) + + wrapped := WrapHandler(&Handler{}) + chain := alice.New() + chain = chain.Append(wrapped) + chain = chain.Append(wrapMiddleware) + handlers, err := chain.Then(handler) + require.NoError(t, err) + + request, err := http.NewRequest(http.MethodGet, "/", bytes.NewReader([]byte("bar"))) + require.NoError(t, err) + + recorder := httptest.NewRecorder() + handlers.ServeHTTP(recorder, request) + // 3 = len("bar") + // 9 = len("0,0,0,toto") + assert.Equal(t, "0,0,0,foo,3,9,200", recorder.Body.String()) +} + +// BenchmarkCapture with response writer and request reader +// $ go test -bench=. ./pkg/middlewares/capture/ +// goos: linux +// goarch: amd64 +// pkg: github.com/traefik/traefik/v2/pkg/middlewares/capture +// cpu: Intel(R) Core(TM) i7-10750H CPU @ 2.60GHz +// BenchmarkCapture/2k-12 280507 4015 ns/op 510.03 MB/s 5072 B/op 14 allocs/op +// BenchmarkCapture/20k-12 135726 8301 ns/op 2467.26 MB/s 41936 B/op 14 allocs/op +// BenchmarkCapture/100k-12 45494 26059 ns/op 3929.54 MB/s 213968 B/op 14 allocs/op +// BenchmarkCapture/2k_captured-12 263713 4356 ns/op 470.20 MB/s 5552 B/op 18 allocs/op +// BenchmarkCapture/20k_captured-12 132243 8790 ns/op 2329.98 MB/s 42416 B/op 18 allocs/op +// BenchmarkCapture/100k_captured-12 45650 26587 ns/op 3851.57 MB/s 214448 B/op 18 allocs/op +// BenchmarkCapture/2k_body-12 274135 7471 ns/op 274.12 MB/s 5624 B/op 20 allocs/op +// BenchmarkCapture/20k_body-12 130206 21149 ns/op 968.36 MB/s 42488 B/op 20 allocs/op +// BenchmarkCapture/100k_body-12 41600 51716 ns/op 1980.06 MB/s 214520 B/op 20 allocs/op +// PASS +func BenchmarkCapture(b *testing.B) { + testCases := []struct { + name string + size int + capture bool + body bool + }{ + { + name: "2k", + size: 2048, + }, + { + name: "20k", + size: 20480, + }, + { + name: "100k", + size: 102400, + }, + { + name: "2k captured", + size: 2048, + capture: true, + }, + { + name: "20k captured", + size: 20480, + capture: true, + }, + { + name: "100k captured", + size: 102400, + capture: true, + }, + { + name: "2k body", + size: 2048, + body: true, + }, + { + name: "20k body", + size: 20480, + body: true, + }, + { + name: "100k body", + size: 102400, + body: true, + }, + } + + for _, test := range testCases { + b.Run(test.name, func(b *testing.B) { + baseBody := generateBytes(test.size) + + next := http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + n, err := rw.Write(baseBody) + require.Equal(b, test.size, n) + require.NoError(b, err) + }) + + var body io.Reader + if test.body { + body = bytes.NewReader(baseBody) + } + + req, err := http.NewRequest(http.MethodGet, "http://foo/", body) + require.NoError(b, err) + + chain := alice.New() + if test.capture || test.body { + captureWrapped := WrapHandler(&Handler{}) + chain = chain.Append(captureWrapped) + } + handlers, err := chain.Then(next) + require.NoError(b, err) + + b.ReportAllocs() + b.SetBytes(int64(test.size)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + runBenchmark(b, test.size, req, handlers) + } + }) + } +} + +func runBenchmark(b *testing.B, size int, req *http.Request, handler http.Handler) { + b.Helper() + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + if code := recorder.Code; code != 200 { + b.Fatalf("Expected 200 but got %d", code) + } + + assert.Equal(b, size, len(recorder.Body.String())) +} + +func generateBytes(length int) []byte { + var value []byte + for i := 0; i < length; i++ { + value = append(value, 0x61+byte(i%26)) + } + return value +} + +func TestRequestReader(t *testing.T) { + buff := bytes.NewBuffer([]byte("foo")) + rr := readCounter{source: io.NopCloser(buff)} + assert.Equal(t, int64(0), rr.size) + + n, err := rr.Read([]byte("bar")) + require.NoError(t, err) + assert.Equal(t, 3, n) + + err = rr.Close() + require.NoError(t, err) + assert.Equal(t, int64(3), rr.size) +} + +type rwWithCloseNotify struct { + *httptest.ResponseRecorder +} + +func (r *rwWithCloseNotify) CloseNotify() <-chan bool { + panic("implement me") +} + +func TestCloseNotifier(t *testing.T) { + testCases := []struct { + rw http.ResponseWriter + desc string + implementsCloseNotifier bool + }{ + { + rw: httptest.NewRecorder(), + desc: "does not implement CloseNotifier", + implementsCloseNotifier: false, + }, + { + rw: &rwWithCloseNotify{httptest.NewRecorder()}, + desc: "implements CloseNotifier", + implementsCloseNotifier: true, + }, + } + + for _, test := range testCases { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + + _, ok := test.rw.(http.CloseNotifier) + assert.Equal(t, test.implementsCloseNotifier, ok) + + rw := newResponseWriter(test.rw) + _, impl := rw.(http.CloseNotifier) + assert.Equal(t, test.implementsCloseNotifier, impl) + }) + } +} diff --git a/pkg/middlewares/metrics/metrics.go b/pkg/middlewares/metrics/metrics.go index 21392d472..08d0185cc 100644 --- a/pkg/middlewares/metrics/metrics.go +++ b/pkg/middlewares/metrics/metrics.go @@ -13,6 +13,7 @@ import ( "github.com/traefik/traefik/v2/pkg/log" "github.com/traefik/traefik/v2/pkg/metrics" "github.com/traefik/traefik/v2/pkg/middlewares" + "github.com/traefik/traefik/v2/pkg/middlewares/capture" "github.com/traefik/traefik/v2/pkg/middlewares/retry" traefiktls "github.com/traefik/traefik/v2/pkg/tls" ) @@ -32,6 +33,8 @@ type metricsMiddleware struct { reqsTLSCounter gokitmetrics.Counter reqDurationHistogram metrics.ScalableHistogram openConnsGauge gokitmetrics.Gauge + reqsBytesCounter gokitmetrics.Counter + respsBytesCounter gokitmetrics.Counter baseLabels []string } @@ -45,6 +48,8 @@ func NewEntryPointMiddleware(ctx context.Context, next http.Handler, registry me reqsTLSCounter: registry.EntryPointReqsTLSCounter(), reqDurationHistogram: registry.EntryPointReqDurationHistogram(), openConnsGauge: registry.EntryPointOpenConnsGauge(), + reqsBytesCounter: registry.EntryPointReqsBytesCounter(), + respsBytesCounter: registry.EntryPointRespsBytesCounter(), baseLabels: []string{"entrypoint", entryPointName}, } } @@ -59,6 +64,8 @@ func NewRouterMiddleware(ctx context.Context, next http.Handler, registry metric reqsTLSCounter: registry.RouterReqsTLSCounter(), reqDurationHistogram: registry.RouterReqDurationHistogram(), openConnsGauge: registry.RouterOpenConnsGauge(), + reqsBytesCounter: registry.RouterReqsBytesCounter(), + respsBytesCounter: registry.RouterRespsBytesCounter(), baseLabels: []string{"router", routerName, "service", serviceName}, } } @@ -73,6 +80,8 @@ func NewServiceMiddleware(ctx context.Context, next http.Handler, registry metri reqsTLSCounter: registry.ServiceReqsTLSCounter(), reqDurationHistogram: registry.ServiceReqDurationHistogram(), openConnsGauge: registry.ServiceOpenConnsGauge(), + reqsBytesCounter: registry.ServiceReqsBytesCounter(), + respsBytesCounter: registry.ServiceRespsBytesCounter(), baseLabels: []string{"service", serviceName}, } } @@ -116,16 +125,22 @@ func (m *metricsMiddleware) ServeHTTP(rw http.ResponseWriter, req *http.Request) m.reqsTLSCounter.With(tlsLabels...).Add(1) } - recorder := newResponseRecorder(rw) start := time.Now() - m.next.ServeHTTP(recorder, req) + m.next.ServeHTTP(rw, req) - labels = append(labels, "code", strconv.Itoa(recorder.getCode())) + ctx := req.Context() + capt, err := capture.FromContext(ctx) + if err != nil { + log.FromContext(middlewares.GetLoggerCtx(ctx, nameEntrypoint, typeName)).Errorf("Could not get Capture: %w", err) + return + } + labels = append(labels, "code", strconv.Itoa(capt.StatusCode())) m.reqDurationHistogram.With(labels...).ObserveFromStart(start) - m.reqsCounter.With(labels...).Add(1) + m.respsBytesCounter.With(labels...).Add(float64(capt.ResponseSize())) + m.reqsBytesCounter.With(labels...).Add(float64(capt.RequestSize())) } func getRequestProtocol(req *http.Request) string { @@ -201,6 +216,6 @@ type RetryListener struct { } // Retried tracks the retry in the RequestMetrics implementation. -func (m *RetryListener) Retried(req *http.Request, attempt int) { +func (m *RetryListener) Retried(_ *http.Request, _ int) { m.retryMetrics.ServiceRetriesCounter().With("service", m.serviceName).Add(1) } diff --git a/pkg/server/middleware/chainbuilder.go b/pkg/server/middleware/chainbuilder.go index 418bdfd14..b4e1223a8 100644 --- a/pkg/server/middleware/chainbuilder.go +++ b/pkg/server/middleware/chainbuilder.go @@ -4,14 +4,13 @@ import ( "context" "github.com/containous/alice" - "github.com/traefik/traefik/v2/pkg/config/static" "github.com/traefik/traefik/v2/pkg/log" "github.com/traefik/traefik/v2/pkg/metrics" "github.com/traefik/traefik/v2/pkg/middlewares/accesslog" + "github.com/traefik/traefik/v2/pkg/middlewares/capture" metricsmiddleware "github.com/traefik/traefik/v2/pkg/middlewares/metrics" mTracing "github.com/traefik/traefik/v2/pkg/middlewares/tracing" "github.com/traefik/traefik/v2/pkg/tracing" - "github.com/traefik/traefik/v2/pkg/tracing/jaeger" ) // ChainBuilder Creates a middleware chain by entry point. It is used for middlewares that are created almost systematically and that need to be created before all others. @@ -19,14 +18,16 @@ type ChainBuilder struct { metricsRegistry metrics.Registry accessLoggerMiddleware *accesslog.Handler tracer *tracing.Tracing + captureMiddleware *capture.Handler } // NewChainBuilder Creates a new ChainBuilder. -func NewChainBuilder(staticConfiguration static.Configuration, metricsRegistry metrics.Registry, accessLoggerMiddleware *accesslog.Handler) *ChainBuilder { +func NewChainBuilder(metricsRegistry metrics.Registry, accessLoggerMiddleware *accesslog.Handler, tracer *tracing.Tracing, captureMiddleware *capture.Handler) *ChainBuilder { return &ChainBuilder{ metricsRegistry: metricsRegistry, accessLoggerMiddleware: accessLoggerMiddleware, - tracer: setupTracing(staticConfiguration.Tracing), + tracer: tracer, + captureMiddleware: captureMiddleware, } } @@ -34,6 +35,10 @@ func NewChainBuilder(staticConfiguration static.Configuration, metricsRegistry m func (c *ChainBuilder) Build(ctx context.Context, entryPointName string) alice.Chain { chain := alice.New() + if c.captureMiddleware != nil { + chain = chain.Append(capture.WrapHandler(c.captureMiddleware)) + } + if c.accessLoggerMiddleware != nil { chain = chain.Append(accesslog.WrapHandler(c.accessLoggerMiddleware)) } @@ -61,69 +66,3 @@ func (c *ChainBuilder) Close() { c.tracer.Close() } } - -func setupTracing(conf *static.Tracing) *tracing.Tracing { - if conf == nil { - return nil - } - - var backend tracing.Backend - - if conf.Jaeger != nil { - backend = conf.Jaeger - } - - if conf.Zipkin != nil { - if backend != nil { - log.WithoutContext().Error("Multiple tracing backend are not supported: cannot create Zipkin backend.") - } else { - backend = conf.Zipkin - } - } - - if conf.Datadog != nil { - if backend != nil { - log.WithoutContext().Error("Multiple tracing backend are not supported: cannot create Datadog backend.") - } else { - backend = conf.Datadog - } - } - - if conf.Instana != nil { - if backend != nil { - log.WithoutContext().Error("Multiple tracing backend are not supported: cannot create Instana backend.") - } else { - backend = conf.Instana - } - } - - if conf.Haystack != nil { - if backend != nil { - log.WithoutContext().Error("Multiple tracing backend are not supported: cannot create Haystack backend.") - } else { - backend = conf.Haystack - } - } - - if conf.Elastic != nil { - if backend != nil { - log.WithoutContext().Error("Multiple tracing backend are not supported: cannot create Elastic backend.") - } else { - backend = conf.Elastic - } - } - - if backend == nil { - log.WithoutContext().Debug("Could not initialize tracing, using Jaeger by default") - defaultBackend := &jaeger.Config{} - defaultBackend.SetDefaults() - backend = defaultBackend - } - - tracer, err := tracing.NewTracing(conf.ServiceName, conf.SpanNameLimit, backend) - if err != nil { - log.WithoutContext().Warnf("Unable to create tracer: %v", err) - return nil - } - return tracer -} diff --git a/pkg/server/router/router_test.go b/pkg/server/router/router_test.go index 390a9a812..867da0a9b 100644 --- a/pkg/server/router/router_test.go +++ b/pkg/server/router/router_test.go @@ -8,13 +8,14 @@ import ( "strings" "testing" + "github.com/containous/alice" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/traefik/traefik/v2/pkg/config/dynamic" "github.com/traefik/traefik/v2/pkg/config/runtime" - "github.com/traefik/traefik/v2/pkg/config/static" "github.com/traefik/traefik/v2/pkg/metrics" "github.com/traefik/traefik/v2/pkg/middlewares/accesslog" + "github.com/traefik/traefik/v2/pkg/middlewares/capture" "github.com/traefik/traefik/v2/pkg/middlewares/requestdecorator" "github.com/traefik/traefik/v2/pkg/server/middleware" "github.com/traefik/traefik/v2/pkg/server/service" @@ -315,7 +316,7 @@ func TestRouterManager_Get(t *testing.T) { roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}}) serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager) middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil) - chainBuilder := middleware.NewChainBuilder(static.Configuration{}, nil, nil) + chainBuilder := middleware.NewChainBuilder(nil, nil, nil, nil) routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry()) @@ -421,7 +422,7 @@ func TestAccessLog(t *testing.T) { roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}}) serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager) middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil) - chainBuilder := middleware.NewChainBuilder(static.Configuration{}, nil, nil) + chainBuilder := middleware.NewChainBuilder(nil, nil, nil, nil) routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry()) @@ -437,7 +438,10 @@ func TestAccessLog(t *testing.T) { reqHost := requestdecorator.New(nil) - accesslogger.ServeHTTP(w, req, http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + chain := alice.New() + chain = chain.Append(capture.WrapHandler(&capture.Handler{})) + chain = chain.Append(accesslog.WrapHandler(accesslogger)) + handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { reqHost.ServeHTTP(w, req, handlers["web"].ServeHTTP) data := accesslog.GetLogData(req) @@ -445,6 +449,9 @@ func TestAccessLog(t *testing.T) { assert.Equal(t, test.expected, data.Core[accesslog.RouterName]) })) + require.NoError(t, err) + + handler.ServeHTTP(w, req) }) } } @@ -710,7 +717,7 @@ func TestRuntimeConfiguration(t *testing.T) { roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}}) serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager) middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil) - chainBuilder := middleware.NewChainBuilder(static.Configuration{}, nil, nil) + chainBuilder := middleware.NewChainBuilder(nil, nil, nil, nil) routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry()) @@ -743,14 +750,6 @@ func TestRuntimeConfiguration(t *testing.T) { func TestProviderOnMiddlewares(t *testing.T) { entryPoints := []string{"web"} - staticCfg := static.Configuration{ - EntryPoints: map[string]*static.EntryPoint{ - "web": { - Address: ":80", - }, - }, - } - rtConf := runtime.NewConfig(dynamic.Configuration{ HTTP: &dynamic.HTTPConfiguration{ Services: map[string]*dynamic.Service{ @@ -793,7 +792,7 @@ func TestProviderOnMiddlewares(t *testing.T) { roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}}) serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager) middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil) - chainBuilder := middleware.NewChainBuilder(staticCfg, nil, nil) + chainBuilder := middleware.NewChainBuilder(nil, nil, nil, nil) routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry()) @@ -861,7 +860,7 @@ func BenchmarkRouterServe(b *testing.B) { serviceManager := service.NewManager(rtConf.Services, nil, nil, staticRoundTripperGetter{res}) middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil) - chainBuilder := middleware.NewChainBuilder(static.Configuration{}, nil, nil) + chainBuilder := middleware.NewChainBuilder(nil, nil, nil, nil) routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry()) diff --git a/pkg/server/routerfactory_test.go b/pkg/server/routerfactory_test.go index 612bd3bc2..0c6697e38 100644 --- a/pkg/server/routerfactory_test.go +++ b/pkg/server/routerfactory_test.go @@ -53,7 +53,7 @@ func TestReuseService(t *testing.T) { managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry(), roundTripperManager, nil) tlsManager := tls.NewManager() - factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, metrics.NewVoidRegistry(), nil), nil, metrics.NewVoidRegistry()) + factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil, nil), nil, metrics.NewVoidRegistry()) entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs})) @@ -189,7 +189,7 @@ func TestServerResponseEmptyBackend(t *testing.T) { managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry(), roundTripperManager, nil) tlsManager := tls.NewManager() - factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, metrics.NewVoidRegistry(), nil), nil, metrics.NewVoidRegistry()) + factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil, nil), nil, metrics.NewVoidRegistry()) entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: test.config(testServer.URL)})) @@ -232,7 +232,7 @@ func TestInternalServices(t *testing.T) { voidRegistry := metrics.NewVoidRegistry() - factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, voidRegistry, nil), nil, voidRegistry) + factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(voidRegistry, nil, nil, nil), nil, voidRegistry) entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs})) diff --git a/pkg/server/server_signals_windows.go b/pkg/server/server_signals_windows.go index 91c14979d..8bfd91fcf 100644 --- a/pkg/server/server_signals_windows.go +++ b/pkg/server/server_signals_windows.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package server