Support Datadog tracer priority sampling

This commit is contained in:
Joost Cassee 2019-01-16 17:08:06 +01:00 committed by Traefiker Bot
parent e5fb1ffeb7
commit 0de1ff8634
14 changed files with 226 additions and 82 deletions

6
Gopkg.lock generated
View file

@ -1962,7 +1962,7 @@
version = "v1.12.0"
[[projects]]
digest = "1:7ed5f088604ff1018ff0e80f4b80765cab92bbe4763b50de852af884d24a526b"
digest = "1:b886012746f19e2a7c6c3901ea9f86e8a5e32ff2b4407086f4f3181269976957"
name = "gopkg.in/DataDog/dd-trace-go.v1"
packages = [
"ddtrace",
@ -1972,8 +1972,8 @@
"ddtrace/tracer",
]
pruneopts = "NUT"
revision = "48eeff27357376bcb31a15674dc4be9078de88b3"
version = "v1.5.0"
revision = "7fb2bce4b1ed6ab61f7a9e1be30dea56de19db7c"
version = "v1.8.0"
[[projects]]
digest = "1:c970218a20933dd0a2eb2006de922217fa9276f57d25009b2a934eb1c50031cc"

View file

@ -255,4 +255,4 @@
[[constraint]]
name = "gopkg.in/DataDog/dd-trace-go.v1"
version = "1.5.0"
version = "1.7.0"

View file

@ -110,6 +110,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
LocalAgentHostPort: "localhost:8126",
GlobalTag: "",
Debug: false,
PrioritySampling: false,
},
}

View file

@ -174,4 +174,10 @@ Traefik supports three tracing backends: Jaeger, Zipkin and DataDog.
#
globalTag = ""
# Enable priority sampling. When using distributed tracing, this option must be enabled in order
# to get all the parts of a distributed trace sampled.
#
# Default: false
#
prioritySampling = false
```

View file

@ -223,6 +223,7 @@ func (gc *GlobalConfiguration) initTracing() {
LocalAgentHostPort: "localhost:8126",
GlobalTag: "",
Debug: false,
PrioritySampling: false,
}
}
if gc.Tracing.Zipkin != nil {

View file

@ -18,6 +18,7 @@ type Config struct {
LocalAgentHostPort string `description:"Set datadog-agent's host:port that the reporter will used. Defaults to localhost:8126" export:"false"`
GlobalTag string `description:"Key:Value tag to be set on all the spans." export:"true"`
Debug bool `description:"Enable DataDog debug." export:"true"`
PrioritySampling bool `description:"Enable priority sampling. When using distributed tracing, this option must be enabled in order to get all the parts of a distributed trace sampled."`
}
// Setup sets up the tracer
@ -29,12 +30,16 @@ func (c *Config) Setup(serviceName string) (opentracing.Tracer, io.Closer, error
value = tag[1]
}
tracer := ddtracer.New(
opts := []datadog.StartOption{
datadog.WithAgentAddr(c.LocalAgentHostPort),
datadog.WithServiceName(serviceName),
datadog.WithGlobalTag(tag[0], value),
datadog.WithDebugMode(c.Debug),
)
}
if c.PrioritySampling {
opts = append(opts, datadog.WithPrioritySampling())
}
tracer := ddtracer.New(opts...)
// Without this, child spans are getting the NOOP tracer
opentracing.SetGlobalTracer(tracer)

View file

@ -18,6 +18,7 @@ type Config struct {
LocalAgentHostPort string `description:"Set datadog-agent's host:port that the reporter will used. Defaults to localhost:8126" export:"false"`
GlobalTag string `description:"Key:Value tag to be set on all the spans." export:"true"`
Debug bool `description:"Enable DataDog debug." export:"true"`
PrioritySampling bool `description:"Enable priority sampling. When using distributed tracing, this option must be enabled in order to get all the parts of a distributed trace sampled."`
}
// Setup sets up the tracer
@ -29,12 +30,16 @@ func (c *Config) Setup(serviceName string) (opentracing.Tracer, io.Closer, error
value = tag[1]
}
tracer := ddtracer.New(
opts := []datadog.StartOption{
datadog.WithAgentAddr(c.LocalAgentHostPort),
datadog.WithServiceName(serviceName),
datadog.WithGlobalTag(tag[0], value),
datadog.WithDebugMode(c.Debug),
)
}
if c.PrioritySampling {
opts = append(opts, datadog.WithPrioritySampling())
}
tracer := ddtracer.New(opts...)
// Without this, child spans are getting the NOOP tracer
opentracing.SetGlobalTracer(tracer)

View file

@ -1,6 +1,7 @@
package tracer
import (
"net/http"
"os"
"path/filepath"
"time"
@ -33,6 +34,9 @@ type config struct {
// propagator propagates span context cross-process
propagator Propagator
// httpRoundTripper defines the http.RoundTripper used by the agent transport.
httpRoundTripper http.RoundTripper
}
// StartOption represents a function that can be provided as a parameter to Start.
@ -45,6 +49,17 @@ func defaults(c *config) {
c.agentAddr = defaultAddress
}
// WithPrioritySampling is deprecated, and priority sampling is enabled by default.
// When using distributed tracing, the priority sampling value is propagated in order to
// get all the parts of a distributed trace sampled.
// To learn more about priority sampling, please visit:
// https://docs.datadoghq.com/tracing/getting_further/trace_sampling_and_storage/#priority-sampling-for-distributed-tracing
func WithPrioritySampling() StartOption {
return func(c *config) {
// This is now enabled by default.
}
}
// WithDebugMode enables debug mode on the tracer, resulting in more verbose logging.
func WithDebugMode(enabled bool) StartOption {
return func(c *config) {
@ -93,6 +108,15 @@ func WithSampler(s Sampler) StartOption {
}
}
// WithHTTPRoundTripper allows customizing the underlying HTTP transport for
// emitting spans. This is useful for advanced customization such as emitting
// spans to a unix domain socket. The default should be used in most cases.
func WithHTTPRoundTripper(r http.RoundTripper) StartOption {
return func(c *config) {
c.httpRoundTripper = r
}
}
// StartSpanOption is a configuration option for StartSpan. It is aliased in order
// to help godoc group all the functions returning it together. It is considered
// more correct to refer to it as the type as the origin, ddtrace.StartSpanOption.

View file

@ -1,10 +1,13 @@
package tracer
import (
"encoding/json"
"io"
"math"
"sync"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
)
// Sampler is the generic interface of any sampler. It must be safe for concurrent use.
@ -59,14 +62,83 @@ const knuthFactor = uint64(1111111111111111111)
// Sample returns true if the given span should be sampled.
func (r *rateSampler) Sample(spn ddtrace.Span) bool {
if r.rate == 1 {
// fast path
return true
}
s, ok := spn.(*span)
if !ok {
return false
}
r.RLock()
defer r.RUnlock()
if r.rate < 1 {
return s.TraceID*knuthFactor < uint64(r.rate*math.MaxUint64)
return sampledByRate(s.TraceID, r.rate)
}
// sampledByRate verifies if the number n should be sampled at the specified
// rate.
func sampledByRate(n uint64, rate float64) bool {
if rate < 1 {
return n*knuthFactor < uint64(rate*math.MaxUint64)
}
return true
}
// prioritySampler holds a set of per-service sampling rates and applies
// them to spans.
type prioritySampler struct {
mu sync.RWMutex
rates map[string]float64
defaultRate float64
}
func newPrioritySampler() *prioritySampler {
return &prioritySampler{
rates: make(map[string]float64),
defaultRate: 1.,
}
}
// readRatesJSON will try to read the rates as JSON from the given io.ReadCloser.
func (ps *prioritySampler) readRatesJSON(rc io.ReadCloser) error {
var payload struct {
Rates map[string]float64 `json:"rate_by_service"`
}
if err := json.NewDecoder(rc).Decode(&payload); err != nil {
return err
}
rc.Close()
const defaultRateKey = "service:,env:"
ps.mu.Lock()
defer ps.mu.Unlock()
ps.rates = payload.Rates
if v, ok := ps.rates[defaultRateKey]; ok {
ps.defaultRate = v
delete(ps.rates, defaultRateKey)
}
return nil
}
// getRate returns the sampling rate to be used for the given span. Callers must
// guard the span.
func (ps *prioritySampler) getRate(spn *span) float64 {
key := "service:" + spn.Service + ",env:" + spn.Meta[ext.Environment]
ps.mu.RLock()
defer ps.mu.RUnlock()
if rate, ok := ps.rates[key]; ok {
return rate
}
return ps.defaultRate
}
// apply applies sampling priority to the given span. Caller must ensure it is safe
// to modify the span.
func (ps *prioritySampler) apply(spn *span) {
rate := ps.getRate(spn)
if sampledByRate(spn.TraceID, rate) {
spn.SetTag(ext.SamplingPriority, ext.PriorityAutoKeep)
} else {
spn.SetTag(ext.SamplingPriority, ext.PriorityAutoReject)
}
spn.SetTag(samplingPriorityRateKey, rate)
}

View file

@ -201,8 +201,8 @@ func (s *span) finish(finishTime int64) {
}
s.finished = true
if !s.context.sampled {
// not sampled
if s.context.drop {
// not sampled by local sampler
return
}
s.context.finish()
@ -235,4 +235,7 @@ func (s *span) String() string {
return strings.Join(lines, "\n")
}
const samplingPriorityKey = "_sampling_priority_v1"
const (
samplingPriorityKey = "_sampling_priority_v1"
samplingPriorityRateKey = "_sampling_priority_rate_v1"
)

View file

@ -16,9 +16,9 @@ var _ ddtrace.SpanContext = (*spanContext)(nil)
type spanContext struct {
// the below group should propagate only locally
trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
sampled bool // whether this span will be sampled or not
trace *trace // reference to the trace that this span belongs too
span *span // reference to the span that hosts this context
drop bool // when true, the span will not be sent to the agent
// the below group should propagate cross-process
@ -40,7 +40,6 @@ func newSpanContext(span *span, parent *spanContext) *spanContext {
context := &spanContext{
traceID: span.TraceID,
spanID: span.SpanID,
sampled: true,
span: span,
}
if v, ok := span.Metrics[samplingPriorityKey]; ok {
@ -49,7 +48,7 @@ func newSpanContext(span *span, parent *spanContext) *spanContext {
}
if parent != nil {
context.trace = parent.trace
context.sampled = parent.sampled
context.drop = parent.drop
context.hasPriority = parent.hasSamplingPriority()
context.priority = parent.samplingPriority()
parent.ForeachBaggageItem(func(k, v string) bool {

View file

@ -175,11 +175,11 @@ func (p *propagator) extractTextMap(reader TextMapReader) (ddtrace.SpanContext,
return ErrSpanContextCorrupted
}
case p.cfg.PriorityHeader:
ctx.priority, err = strconv.Atoi(v)
priority, err := strconv.Atoi(v)
if err != nil {
return ErrSpanContextCorrupted
}
ctx.hasPriority = true
ctx.setSamplingPriority(priority)
default:
if strings.HasPrefix(key, p.cfg.BaggagePrefix) {
ctx.setBaggageItem(strings.TrimPrefix(key, p.cfg.BaggagePrefix), v)

View file

@ -41,6 +41,9 @@ type tracer struct {
// a synchronous (blocking) operation, meaning that it will only return after
// the trace has been fully processed and added onto the payload.
syncPush chan struct{}
// prioritySampling holds an instance of the priority sampler.
prioritySampling *prioritySampler
}
const (
@ -112,21 +115,22 @@ func newTracer(opts ...StartOption) *tracer {
fn(c)
}
if c.transport == nil {
c.transport = newTransport(c.agentAddr)
c.transport = newTransport(c.agentAddr, c.httpRoundTripper)
}
if c.propagator == nil {
c.propagator = NewPropagator(nil)
}
t := &tracer{
config: c,
payload: newPayload(),
flushAllReq: make(chan chan<- struct{}),
flushTracesReq: make(chan struct{}, 1),
flushErrorsReq: make(chan struct{}, 1),
exitReq: make(chan struct{}),
payloadQueue: make(chan []*span, payloadQueueSize),
errorBuffer: make(chan error, errorBufferSize),
stopped: make(chan struct{}),
config: c,
payload: newPayload(),
flushAllReq: make(chan chan<- struct{}),
flushTracesReq: make(chan struct{}, 1),
flushErrorsReq: make(chan struct{}, 1),
exitReq: make(chan struct{}),
payloadQueue: make(chan []*span, payloadQueueSize),
errorBuffer: make(chan error, errorBufferSize),
stopped: make(chan struct{}),
prioritySampling: newPrioritySampler(),
}
go t.worker()
@ -247,6 +251,7 @@ func (t *tracer) StartSpan(operationName string, options ...ddtrace.StartSpanOpt
span.Metrics[samplingPriorityKey] = float64(context.samplingPriority())
}
if context.span != nil {
// it has a local parent, inherit the service
context.span.RLock()
span.Service = context.span.Service
context.span.RUnlock()
@ -254,9 +259,8 @@ func (t *tracer) StartSpan(operationName string, options ...ddtrace.StartSpanOpt
}
span.context = newSpanContext(span, context)
if context == nil || context.span == nil {
// this is either a global root span or a process-level root span
// this is either a root span or it has a remote parent, we should add the PID.
span.SetTag(ext.Pid, strconv.Itoa(os.Getpid()))
t.sample(span)
}
// add tags from options
for k, v := range opts.Tags {
@ -266,6 +270,10 @@ func (t *tracer) StartSpan(operationName string, options ...ddtrace.StartSpanOpt
for k, v := range t.config.globalTags {
span.SetTag(k, v)
}
if context == nil {
// this is a brand new trace, sample it
t.sample(span)
}
return span
}
@ -299,10 +307,13 @@ func (t *tracer) flushTraces() {
if t.config.debug {
log.Printf("Sending payload: size: %d traces: %d\n", size, count)
}
err := t.config.transport.send(t.payload)
rc, err := t.config.transport.send(t.payload)
if err != nil {
t.pushError(&dataLossError{context: err, count: count})
}
if err == nil {
t.prioritySampling.readRatesJSON(rc) // TODO: handle error?
}
t.payload.reset()
}
@ -350,21 +361,17 @@ const sampleRateMetricKey = "_sample_rate"
// Sample samples a span with the internal sampler.
func (t *tracer) sample(span *span) {
if span.context.hasPriority {
// sampling decision was already made
return
}
sampler := t.config.sampler
sampled := sampler.Sample(span)
span.context.sampled = sampled
if !sampled {
if !sampler.Sample(span) {
span.context.drop = true
return
}
if rs, ok := sampler.(RateSampler); ok && rs.Rate() < 1 {
// the span was sampled using a rate sampler which wasn't all permissive,
// so we make note of the sampling rate.
span.Lock()
defer span.Unlock()
if span.finished {
// we don't touch finished span as they might be flushing
return
}
span.Metrics[sampleRateMetricKey] = rs.Rate()
}
t.prioritySampling.apply(span)
}

View file

@ -2,17 +2,37 @@ package tracer
import (
"fmt"
"io"
"net"
"net/http"
"os"
"runtime"
"strconv"
"strings"
"time"
)
// TODO(gbbr): find a more effective way to keep this up to date,
// e.g. via `go generate`
var tracerVersion = "v1.5.0"
var (
// TODO(gbbr): find a more effective way to keep this up to date,
// e.g. via `go generate`
tracerVersion = "v1.7.0"
// We copy the transport to avoid using the default one, as it might be
// augmented with tracing and we don't want these calls to be recorded.
// See https://golang.org/pkg/net/http/#DefaultTransport .
defaultRoundTripper = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
)
const (
defaultHostname = "localhost"
@ -22,25 +42,33 @@ const (
traceCountHeader = "X-Datadog-Trace-Count" // header containing the number of traces in the payload
)
// Transport is an interface for span submission to the agent.
// transport is an interface for span submission to the agent.
type transport interface {
send(p *payload) error
// send sends the payload p to the agent using the transport set up.
// It returns a non-nil response body when no error occurred.
send(p *payload) (body io.ReadCloser, err error)
}
// newTransport returns a new Transport implementation that sends traces to a
// trace agent running on the given hostname and port. If the zero values for
// hostname and port are provided, the default values will be used ("localhost"
// for hostname, and "8126" for port).
// trace agent running on the given hostname and port, using a given
// http.RoundTripper. If the zero values for hostname and port are provided,
// the default values will be used ("localhost" for hostname, and "8126" for
// port). If roundTripper is nil, a default is used.
//
// In general, using this method is only necessary if you have a trace agent
// running on a non-default port or if it's located on another machine.
func newTransport(addr string) transport {
return newHTTPTransport(addr)
// running on a non-default port, if it's located on another machine, or when
// otherwise needing to customize the transport layer, for instance when using
// a unix domain socket.
func newTransport(addr string, roundTripper http.RoundTripper) transport {
if roundTripper == nil {
roundTripper = defaultRoundTripper
}
return newHTTPTransport(addr, roundTripper)
}
// newDefaultTransport return a default transport for this tracing client
func newDefaultTransport() transport {
return newHTTPTransport(defaultAddress)
return newHTTPTransport(defaultAddress, defaultRoundTripper)
}
type httpTransport struct {
@ -50,7 +78,7 @@ type httpTransport struct {
}
// newHTTPTransport returns an httpTransport for the given endpoint
func newHTTPTransport(addr string) *httpTransport {
func newHTTPTransport(addr string, roundTripper http.RoundTripper) *httpTransport {
// initialize the default EncoderPool with Encoder headers
defaultHeaders := map[string]string{
"Datadog-Meta-Lang": "go",
@ -60,34 +88,20 @@ func newHTTPTransport(addr string) *httpTransport {
"Content-Type": "application/msgpack",
}
return &httpTransport{
traceURL: fmt.Sprintf("http://%s/v0.3/traces", resolveAddr(addr)),
traceURL: fmt.Sprintf("http://%s/v0.4/traces", resolveAddr(addr)),
client: &http.Client{
// We copy the transport to avoid using the default one, as it might be
// augmented with tracing and we don't want these calls to be recorded.
// See https://golang.org/pkg/net/http/#DefaultTransport .
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
Timeout: defaultHTTPTimeout,
Transport: roundTripper,
Timeout: defaultHTTPTimeout,
},
headers: defaultHeaders,
}
}
func (t *httpTransport) send(p *payload) error {
func (t *httpTransport) send(p *payload) (body io.ReadCloser, err error) {
// prepare the client and send the payload
req, err := http.NewRequest("POST", t.traceURL, p)
if err != nil {
return fmt.Errorf("cannot create http request: %v", err)
return nil, fmt.Errorf("cannot create http request: %v", err)
}
for header, value := range t.headers {
req.Header.Set(header, value)
@ -96,25 +110,26 @@ func (t *httpTransport) send(p *payload) error {
req.Header.Set("Content-Length", strconv.Itoa(p.size()))
response, err := t.client.Do(req)
if err != nil {
return err
return nil, err
}
defer response.Body.Close()
if code := response.StatusCode; code >= 400 {
// error, check the body for context information and
// return a nice error.
msg := make([]byte, 1000)
n, _ := response.Body.Read(msg)
response.Body.Close()
txt := http.StatusText(code)
if n > 0 {
return fmt.Errorf("%s (Status: %s)", msg[:n], txt)
return nil, fmt.Errorf("%s (Status: %s)", msg[:n], txt)
}
return fmt.Errorf("%s", txt)
return nil, fmt.Errorf("%s", txt)
}
return nil
return response.Body, nil
}
// resolveAddr resolves the given agent address and fills in any missing host
// and port using the defaults.
// and port using the defaults. Some environment variable settings will
// take precedence over configuration.
func resolveAddr(addr string) string {
host, port, err := net.SplitHostPort(addr)
if err != nil {
@ -127,5 +142,11 @@ func resolveAddr(addr string) string {
if port == "" {
port = defaultPort
}
if v := os.Getenv("DD_AGENT_HOST"); v != "" {
host = v
}
if v := os.Getenv("DD_TRACE_AGENT_PORT"); v != "" {
port = v
}
return fmt.Sprintf("%s:%s", host, port)
}