Arrange Act Assert

Jag Reehals thinking on things, mostly product development

Instrumenting Message Queues? Autotel Handles the Ceremony

18 Jan 2026

The OneUptime team is spot on in their Instrument Message Queues with OpenTelemetry post.

Inject trace context on the producer, extract on the consumer; use PRODUCER and CONSUMER span kinds; set semantic conventions (messaging.system, messaging.destination.name, messaging.operation, Kafka partition/offset/consumer group).

They show the raw OpenTelemetry code. It's comprehensive. It's also verbose. Every team ends up re-implementing the same patterns: inject, extract, span kinds, semantic attributes, error handling.

We've all been there: copying "best practice" code from blog posts and adapting it for our broker.

Their key insight:

For batch processing, use a batch span with links or child spans to contributing traces.

But there's still a gap...

I built Autotel to close that gap.

While this demo uses Kafka, the same PRODUCER/CONSUMER span patterns apply to other brokers.

Autotel makes message queue instrumentation the default. PRODUCER/CONSUMER span kinds automatic. Semantic attributes automatic. Error handling with recordException() and setStatus(ERROR) in the wrappers. Batch spans with links via extractBatchLineage().

The Problem You're Solving

The OneUptime article shows what correct instrumentation looks like:

// Manual approach: producer span with all the ceremony
import { trace, context, SpanKind, SpanStatusCode } from '@opentelemetry/api';

const tracer = trace.getTracer('my-service');

async function publishMessage(topic: string, payload: any) {
  const span = tracer.startSpan(`${topic} publish`, {
    kind: SpanKind.PRODUCER,
    attributes: {
      'messaging.system': 'kafka',
      'messaging.destination.name': topic,
      'messaging.operation': 'publish',
    },
  });

  try {
    const headers: Record<string, string> = {};

    // Inject trace context into headers
    propagation.inject(context.active(), headers, {
      set: (carrier, key, value) => { carrier[key] = value; }
    });

    await producer.send({
      topic,
      messages: [{ value: JSON.stringify(payload), headers }],
    });
  } catch (error) {
    span.recordException(error as Error);
    span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
    throw error;
  } finally {
    span.end();
  }
}
// Manual approach: consumer span with semantic conventions
async function consumeMessage(msg: KafkaMessage) {
  // Extract trace context from headers
  const parentContext = propagation.extract(context.active(), msg.headers, {
    get: (carrier, key) => carrier[key]?.toString()
  });

  const span = tracer.startSpan(`${topic} process`, {
    kind: SpanKind.CONSUMER,
    attributes: {
      'messaging.system': 'kafka',
      'messaging.destination.name': topic,
      'messaging.operation': 'process',
      'messaging.kafka.consumer.group': consumerGroup,
      'messaging.kafka.partition': msg.partition,
      'messaging.kafka.offset': msg.offset.toString(),
    },
  }, parentContext);

  try {
    await context.with(trace.setSpan(parentContext, span), async () => {
      await processMessage(msg);
    });
  } catch (error) {
    span.recordException(error as Error);
    span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
    throw error;
  } finally {
    span.end();
  }
}
// Manual approach: batch span with child spans
async function processBatch(messages: KafkaMessage[]) {
  const batchSpan = tracer.startSpan('batch process', {
    kind: SpanKind.CONSUMER,
    attributes: {
      'messaging.batch.message_count': messages.length,
    },
  });

  try {
    for (const msg of messages) {
      const parentContext = propagation.extract(context.active(), msg.headers, {
        get: (carrier, key) => carrier[key]?.toString()
      });

      const childSpan = tracer.startSpan(`message process`, {
        kind: SpanKind.CONSUMER,
        links: [{ context: trace.getSpanContext(parentContext)! }],
      }, trace.setSpan(context.active(), batchSpan));

      try {
        await processMessage(msg);
      } finally {
        childSpan.end();
      }
    }
  } finally {
    batchSpan.end();
  }
}

It works, but it's a lot of ceremony for something we all want to be the default.

The Solution: Thin Wrappers, Correct Defaults

With Autotel, the same instrumentation uses withProducerSpan() and withConsumerSpan():

// Producer: automatic PRODUCER span kind, semantic attributes
import { withProducerSpan, injectTraceHeaders } from "@demo/observability/kafka";

async function sendEvent(payload: Envelope, headers: Record<string, string>) {
  await withProducerSpan(
    "orders",
    { messageKey: payload.correlation_id },
    async () => {
      const h = injectTraceHeaders(headers);
      await producer.send({
        messages: [{
          topic: "orders",
          key: payload.correlation_id,
          value: JSON.stringify(payload),
          headers: h,
        }],
      });
    },
  );
}
// Span includes: messaging.system=kafka, messaging.destination.name=orders,
// messaging.operation=publish, SpanKind=PRODUCER
// Error handling: recordException() + setStatus(ERROR) on throw
// Consumer: automatic CONSUMER span kind, semantic attributes
import { withConsumerSpan, normalizeHeaders } from "@demo/observability/kafka";

for await (const msg of stream) {
  const headers = normalizeHeaders(msg.headers);
  const messageMeta = { partition: msg.partition, offset: String(msg.offset) };

  await withConsumerSpan(
    "orders",
    "email-worker",
    headers,
    messageMeta,
    async () => {
      await processMessage(msg);
    },
  );
}
// Span includes: messaging.system=kafka, messaging.destination.name=orders,
// messaging.operation=process, messaging.kafka.consumer.group=email-worker,
// messaging.kafka.partition, messaging.kafka.offset, SpanKind=CONSUMER
// Context extracted from headers automatically

Batch Spans with Links (No Child Span Creation)

For batch processing, extractBatchLineage() creates links without manual child spans:

import { extractBatchLineage, withProcessingSpan, SEMATTRS_LINKED_TRACE_ID_COUNT, SEMATTRS_LINKED_TRACE_ID_HASH } from "@demo/observability/kafka";

async function doFlush() {
  const items = batch;
  batch = [];

  // Extract lineage: trace links + summary hash
  const lineage = extractBatchLineage(
    items.map((it) => ({ headers: it.headers })),
    { maxLinks: 50 },
  );

  await withProcessingSpan(
    {
      name: "v1.settlements.batched process",
      headers: {},
      contextMode: "none",  // New trace for batch
      links: lineage.links,  // Links to contributing traces
      topic: "settlements",
      consumerGroup: "batcher",
    },
    async (span) => {
      // Summary attributes for querying
      span.setAttribute(SEMATTRS_LINKED_TRACE_ID_COUNT, lineage.linked_trace_id_count);
      span.setAttribute(SEMATTRS_LINKED_TRACE_ID_HASH, lineage.linked_trace_id_hash);
      await processBatch(items);
    },
  );
}

The batch span carries:

No manual child span creation. No looping through messages to create links.

Manual Instrumentation vs Autotel

Manual Instrumentation Autotel
tracer.startSpan() with SpanKind.PRODUCER withProducerSpan()
tracer.startSpan() with SpanKind.CONSUMER withConsumerSpan()
Manual propagation.inject() injectTraceHeaders()
Manual propagation.extract() Automatic in withConsumerSpan()
Manual span.setAttribute() for each convention Semantic conventions automatic
Try/catch with recordException() + setStatus(ERROR) Error handling in wrapper
span.end() in finally block Automatic on wrapper completion
Manual child spans for batch extractBatchLineage() with links
Different code per broker Same API (withProducerSpan / withConsumerSpan)

What the Wrappers Provide

The withProducerSpan() and withConsumerSpan() wrappers are thin layers over autotel-plugins:

// packages/observability/src/kafka.ts
import { withProducerSpan as pluginWithProducerSpan } from "autotel-plugins/kafka";

export async function withProducerSpan<T>(
  topic: string,
  options: ProducerSpanOptions | undefined,
  fn: () => Promise<T>,
): Promise<T> {
  return pluginWithProducerSpan(
    {
      name: `${topic} ${MESSAGING_OPERATION_PUBLISH}`,
      topic,
      messageKey: options?.messageKey,
    },
    async () => fn(),
  );
}

The plugin handles:

You write business logic. The wrapper handles observability ceremony. Forwarded retry or dead-letter messages preserve trace context and correlation_id so failures remain observable across hops.

Try It Yourself

Clone the demo and see automatic instrumentation:

git clone https://github.com/jagreehal/event-driven-observability-demo
cd event-driven-observability-demo
pnpm install
docker compose up -d  # Kafka, Jaeger
pnpm build && pnpm start

Trigger a checkout:

curl -sX POST http://localhost:3000/rpc/checkout/start \
  -H "content-type: application/json" \
  -d '{"cartValue":149.99,"itemCount":2,"userId":"usr_123","tenantId":"acme"}' | jq

Open Jaeger at http://localhost:16686. Find the trace. You'll see:

All semantic conventions set. No manual attribute code in the application.

Evidence from a live demo run

Here's actual output from a demo run (2026-02-07):

Jaeger trace (queried via Jaeger API):

{
  "traceID": "429a7e3408f6408f259348df534db2fc",
  "spanCount": 3,
  "services": ["api", "worker-email"],
  "spans": [
    {
      "operation": "checkout/start",
      "kind": "internal",
      "service": "api"
    },
    {
      "operation": "domain-events publish",
      "kind": "producer",
      "attributes": {
        "messaging.system": "kafka",
        "messaging.destination.name": "domain-events",
        "messaging.operation": "publish",
        "messaging.kafka.message.key": "429a7e3408f6408f"
      }
    },
    {
      "operation": "domain-events process",
      "kind": "consumer",
      "attributes": {
        "messaging.system": "kafka",
        "messaging.destination.name": "domain-events",
        "messaging.kafka.consumer.group": "demo-v1",
        "messaging.kafka.partition": 2,
        "messaging.kafka.offset": "0"
      }
    }
  ]
}

Same traceID across api and worker-email; PRODUCER and CONSUMER span kinds; all messaging semantic conventions set automatically by withProducerSpan / withConsumerSpan.

Repo: github.com/jagreehal/event-driven-observability-demo

tracing autotel opentelemetry kafka observability-series