diff --git a/go.mod b/go.mod index f0f2c3ed..d8987625 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/riverqueue/river v0.38.0 github.com/riverqueue/river/riverdriver/riverpgxv5 v0.38.0 github.com/riverqueue/river/rivertype v0.38.0 - github.com/riverqueue/rivercontrib/otelriver v0.10.0 + github.com/riverqueue/rivercontrib/otelriver v0.11.0 github.com/rodaine/table v1.3.1 github.com/sourcegraph/jsonrpc2 v0.2.1 github.com/spf13/cobra v1.10.2 diff --git a/go.sum b/go.sum index eff545b6..4145dc8c 100644 --- a/go.sum +++ b/go.sum @@ -422,6 +422,8 @@ github.com/riverqueue/rivercontrib/otelriver v0.8.0 h1:zBFuoMhcGq0P1rrNl+kbxW6A2 github.com/riverqueue/rivercontrib/otelriver v0.8.0/go.mod h1:qwnMagI9IsFGEaKlp5oMecLGxE4byhOk8kqis7oeomo= github.com/riverqueue/rivercontrib/otelriver v0.10.0 h1:cOtwJ6PyVGQWN45XfuSQSMJAV60KozgES8vaHE9u1F0= github.com/riverqueue/rivercontrib/otelriver v0.10.0/go.mod h1:Ewb2HiCy9yoltuomU4yZcdvBRHQBNo2683qhChq2JOQ= +github.com/riverqueue/rivercontrib/otelriver v0.11.0 h1:galpkbNRywwtrENLbHn+GB4F8lNYg2Mtt/025qe7cPQ= +github.com/riverqueue/rivercontrib/otelriver v0.11.0/go.mod h1:Ewb2HiCy9yoltuomU4yZcdvBRHQBNo2683qhChq2JOQ= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rodaine/protogofakeit v0.1.1 h1:ZKouljuRM3A+TArppfBqnH8tGZHOwM/pjvtXe9DaXH8= diff --git a/vendor/github.com/riverqueue/rivercontrib/otelriver/middleware.go b/vendor/github.com/riverqueue/rivercontrib/otelriver/middleware.go index 63173f3b..27ea91e0 100644 --- a/vendor/github.com/riverqueue/rivercontrib/otelriver/middleware.go +++ b/vendor/github.com/riverqueue/rivercontrib/otelriver/middleware.go @@ -3,14 +3,17 @@ package otelriver import ( "cmp" "context" + "encoding/json" "errors" "slices" "time" + "github.com/tidwall/sjson" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "github.com/riverqueue/river" @@ -45,6 +48,11 @@ type MiddlewareConfig struct { // metric names, with attributes differentiating them. EnableSemanticMetrics bool + // EnableTracePropagation injects W3C trace context (traceparent/tracestate) + // into job metadata on insert and extracts it on work, adding a span link + // from the work span back to the span that enqueued the job. + EnableTracePropagation bool + // EnableWorkSpanJobKindSuffix appends the job kind a suffix to work spans // so they look like `river.work/my_job` instead of `river.work`. EnableWorkSpanJobKindSuffix bool @@ -208,6 +216,12 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job } }() + if m.config.EnableTracePropagation { + for i := range manyParams { + manyParams[i].Metadata = injectTraceContext(ctx, manyParams[i].Metadata) + } + } + insertRes, err = doInner(ctx) panicked = false return insertRes, err @@ -219,8 +233,18 @@ func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner fu spanName += "/" + job.Kind } + var startOpts []trace.SpanStartOption + if m.config.EnableTracePropagation { + //nolint:contextcheck + if sc := extractSpanContext(job.Metadata); sc.IsValid() { + // We use a *link* to the span that enqueued this value, because river jobs are async by nature, so they may happen + // minutes, hours, or even days after they're enqueued, which can lead to really weird span contexts if a direct parent + // relationship is used. + startOpts = append(startOpts, trace.WithLinks(trace.Link{SpanContext: sc})) + } + } ctx, span := m.tracer.Start(ctx, spanName, - trace.WithSpanKind(trace.SpanKindConsumer)) + append(startOpts, trace.WithSpanKind(trace.SpanKindConsumer))...) defer span.End() attrs := []attribute.KeyValue{ @@ -336,6 +360,54 @@ func mustInt64Counter(meter metric.Meter, name string, options ...metric.Int64Co return metric } +// injectTraceContext injects the current span context from ctx into metadata +// JSON under the W3C "traceparent" (and optionally "tracestate") key. If +// injection fails for any reason the original metadata is returned unchanged. +func injectTraceContext(ctx context.Context, metadata []byte) []byte { + carrier := make(propagation.MapCarrier) + propagation.TraceContext{}.Inject(ctx, carrier) + if len(carrier) == 0 { + return metadata + } + if len(metadata) == 0 { + metadata = []byte("{}") + } + original := metadata + for k, v := range carrier { + var err error + metadata, err = sjson.SetBytes(metadata, k, v) + if err != nil { + return original + } + } + return metadata +} + +// extractSpanContext reads W3C trace context from metadata JSON and returns the +// remote SpanContext it encodes. Returns a zero SpanContext (IsValid() == false) +// if no traceparent is present or the metadata cannot be parsed. +func extractSpanContext(metadata []byte) trace.SpanContext { + if len(metadata) == 0 { + return trace.SpanContext{} + } + var meta map[string]any + if err := json.Unmarshal(metadata, &meta); err != nil { + return trace.SpanContext{} + } + carrier := make(propagation.MapCarrier) + for k, v := range meta { + if s, ok := v.(string); ok { + carrier[k] = s + } + } + // We use context.Background here because the only purpose of this function is to return + // a span context for *linking*. If one doesn't exist, we don't want to extract anything - + // and we certainly don't want to extract the span from `ctx`, which would most often lead to us + // linking to ourselves, which is pretty obviously incorrect! + extracted := propagation.TraceContext{}.Extract(context.Background(), carrier) + return trace.SpanFromContext(extracted).SpanContext() +} + // Sets success status on the given span and within the set of attributes. The // index of the status attribute is required ahead of time as a minor // optimization. diff --git a/vendor/modules.txt b/vendor/modules.txt index 650d009e..ed0ff7b8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -807,7 +807,7 @@ github.com/riverqueue/river/rivershared/util/valutil # github.com/riverqueue/river/rivertype v0.38.0 ## explicit; go 1.25.0 github.com/riverqueue/river/rivertype -# github.com/riverqueue/rivercontrib/otelriver v0.10.0 +# github.com/riverqueue/rivercontrib/otelriver v0.11.0 ## explicit; go 1.25.0 github.com/riverqueue/rivercontrib/otelriver # github.com/rodaine/table v1.3.1