Instrumenting Message Queues? Autotel Handles the Ceremony
18 Jan 2026The OneUptime team is spot on in their Instrument Message Queues with OpenTelemetry post.
Inject trace context on the producer, extract on the consumer; use PRODUCER and CONSUMER span kinds; set semantic conventions (messaging.system, messaging.destination.name, messaging.operation, Kafka partition/offset/consumer group).
They show the raw OpenTelemetry code. It's comprehensive. It's also verbose. Every team ends up re-implementing the same patterns: inject, extract, span kinds, semantic attributes, error handling.
We've all been there: copying "best practice" code from blog posts and adapting it for our broker.
Their key insight:
For batch processing, use a batch span with links or child spans to contributing traces.
But there's still a gap...
I built Autotel to close that gap.
While this demo uses Kafka, the same PRODUCER/CONSUMER span patterns apply to other brokers.
Autotel makes message queue instrumentation the default. PRODUCER/CONSUMER span kinds automatic. Semantic attributes automatic. Error handling with recordException() and setStatus(ERROR) in the wrappers. Batch spans with links via extractBatchLineage().
The Problem You're Solving
The OneUptime article shows what correct instrumentation looks like:
// Manual approach: producer span with all the ceremony
import { trace, context, SpanKind, SpanStatusCode } from '@opentelemetry/api';
const tracer = trace.getTracer('my-service');
async function publishMessage(topic: string, payload: any) {
const span = tracer.startSpan(`${topic} publish`, {
kind: SpanKind.PRODUCER,
attributes: {
'messaging.system': 'kafka',
'messaging.destination.name': topic,
'messaging.operation': 'publish',
},
});
try {
const headers: Record<string, string> = {};
// Inject trace context into headers
propagation.inject(context.active(), headers, {
set: (carrier, key, value) => { carrier[key] = value; }
});
await producer.send({
topic,
messages: [{ value: JSON.stringify(payload), headers }],
});
} catch (error) {
span.recordException(error as Error);
span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
throw error;
} finally {
span.end();
}
}
// Manual approach: consumer span with semantic conventions
async function consumeMessage(msg: KafkaMessage) {
// Extract trace context from headers
const parentContext = propagation.extract(context.active(), msg.headers, {
get: (carrier, key) => carrier[key]?.toString()
});
const span = tracer.startSpan(`${topic} process`, {
kind: SpanKind.CONSUMER,
attributes: {
'messaging.system': 'kafka',
'messaging.destination.name': topic,
'messaging.operation': 'process',
'messaging.kafka.consumer.group': consumerGroup,
'messaging.kafka.partition': msg.partition,
'messaging.kafka.offset': msg.offset.toString(),
},
}, parentContext);
try {
await context.with(trace.setSpan(parentContext, span), async () => {
await processMessage(msg);
});
} catch (error) {
span.recordException(error as Error);
span.setStatus({ code: SpanStatusCode.ERROR, message: (error as Error).message });
throw error;
} finally {
span.end();
}
}
// Manual approach: batch span with child spans
async function processBatch(messages: KafkaMessage[]) {
const batchSpan = tracer.startSpan('batch process', {
kind: SpanKind.CONSUMER,
attributes: {
'messaging.batch.message_count': messages.length,
},
});
try {
for (const msg of messages) {
const parentContext = propagation.extract(context.active(), msg.headers, {
get: (carrier, key) => carrier[key]?.toString()
});
const childSpan = tracer.startSpan(`message process`, {
kind: SpanKind.CONSUMER,
links: [{ context: trace.getSpanContext(parentContext)! }],
}, trace.setSpan(context.active(), batchSpan));
try {
await processMessage(msg);
} finally {
childSpan.end();
}
}
} finally {
batchSpan.end();
}
}
It works, but it's a lot of ceremony for something we all want to be the default.
The Solution: Thin Wrappers, Correct Defaults
With Autotel, the same instrumentation uses withProducerSpan() and withConsumerSpan():
// Producer: automatic PRODUCER span kind, semantic attributes
import { withProducerSpan, injectTraceHeaders } from "@demo/observability/kafka";
async function sendEvent(payload: Envelope, headers: Record<string, string>) {
await withProducerSpan(
"orders",
{ messageKey: payload.correlation_id },
async () => {
const h = injectTraceHeaders(headers);
await producer.send({
messages: [{
topic: "orders",
key: payload.correlation_id,
value: JSON.stringify(payload),
headers: h,
}],
});
},
);
}
// Span includes: messaging.system=kafka, messaging.destination.name=orders,
// messaging.operation=publish, SpanKind=PRODUCER
// Error handling: recordException() + setStatus(ERROR) on throw
// Consumer: automatic CONSUMER span kind, semantic attributes
import { withConsumerSpan, normalizeHeaders } from "@demo/observability/kafka";
for await (const msg of stream) {
const headers = normalizeHeaders(msg.headers);
const messageMeta = { partition: msg.partition, offset: String(msg.offset) };
await withConsumerSpan(
"orders",
"email-worker",
headers,
messageMeta,
async () => {
await processMessage(msg);
},
);
}
// Span includes: messaging.system=kafka, messaging.destination.name=orders,
// messaging.operation=process, messaging.kafka.consumer.group=email-worker,
// messaging.kafka.partition, messaging.kafka.offset, SpanKind=CONSUMER
// Context extracted from headers automatically
Batch Spans with Links (No Child Span Creation)
For batch processing, extractBatchLineage() creates links without manual child spans:
import { extractBatchLineage, withProcessingSpan, SEMATTRS_LINKED_TRACE_ID_COUNT, SEMATTRS_LINKED_TRACE_ID_HASH } from "@demo/observability/kafka";
async function doFlush() {
const items = batch;
batch = [];
// Extract lineage: trace links + summary hash
const lineage = extractBatchLineage(
items.map((it) => ({ headers: it.headers })),
{ maxLinks: 50 },
);
await withProcessingSpan(
{
name: "v1.settlements.batched process",
headers: {},
contextMode: "none", // New trace for batch
links: lineage.links, // Links to contributing traces
topic: "settlements",
consumerGroup: "batcher",
},
async (span) => {
// Summary attributes for querying
span.setAttribute(SEMATTRS_LINKED_TRACE_ID_COUNT, lineage.linked_trace_id_count);
span.setAttribute(SEMATTRS_LINKED_TRACE_ID_HASH, lineage.linked_trace_id_hash);
await processBatch(items);
},
);
}
The batch span carries:
- Links to contributing traces (capped by
maxLinksto avoid payload explosion) - linked_trace_id_count: Number of traces that contributed
- linked_trace_id_hash: Stable hash for "same set of inputs" queries
No manual child span creation. No looping through messages to create links.
Manual Instrumentation vs Autotel
| Manual Instrumentation | Autotel |
|---|---|
tracer.startSpan() with SpanKind.PRODUCER |
withProducerSpan() |
tracer.startSpan() with SpanKind.CONSUMER |
withConsumerSpan() |
Manual propagation.inject() |
injectTraceHeaders() |
Manual propagation.extract() |
Automatic in withConsumerSpan() |
Manual span.setAttribute() for each convention |
Semantic conventions automatic |
Try/catch with recordException() + setStatus(ERROR) |
Error handling in wrapper |
span.end() in finally block |
Automatic on wrapper completion |
| Manual child spans for batch | extractBatchLineage() with links |
| Different code per broker | Same API (withProducerSpan / withConsumerSpan) |
What the Wrappers Provide
The withProducerSpan() and withConsumerSpan() wrappers are thin layers over autotel-plugins:
// packages/observability/src/kafka.ts
import { withProducerSpan as pluginWithProducerSpan } from "autotel-plugins/kafka";
export async function withProducerSpan<T>(
topic: string,
options: ProducerSpanOptions | undefined,
fn: () => Promise<T>,
): Promise<T> {
return pluginWithProducerSpan(
{
name: `${topic} ${MESSAGING_OPERATION_PUBLISH}`,
topic,
messageKey: options?.messageKey,
},
async () => fn(),
);
}
The plugin handles:
- SpanKind.PRODUCER / SpanKind.CONSUMER
- Semantic attributes:
messaging.system,messaging.destination.name,messaging.operation, Kafka-specific (partition,offset,consumer.group) - Error handling:
recordException(error)+setStatus({ code: SpanStatusCode.ERROR }) - Context propagation: Inject on producer, extract on consumer
You write business logic. The wrapper handles observability ceremony. Forwarded retry or dead-letter messages preserve trace context and correlation_id so failures remain observable across hops.
Try It Yourself
Clone the demo and see automatic instrumentation:
git clone https://github.com/jagreehal/event-driven-observability-demo
cd event-driven-observability-demo
pnpm install
docker compose up -d # Kafka, Jaeger
pnpm build && pnpm start
Trigger a checkout:
curl -sX POST http://localhost:3000/rpc/checkout/start \
-H "content-type: application/json" \
-d '{"cartValue":149.99,"itemCount":2,"userId":"usr_123","tenantId":"acme"}' | jq
Open Jaeger at http://localhost:16686. Find the trace. You'll see:
- PRODUCER spans with
messaging.system=kafka,messaging.destination.name,messaging.operation=publish - CONSUMER spans with
messaging.kafka.consumer.group,messaging.kafka.partition,messaging.kafka.offset - Batch spans with
linked_trace_id_countandlinked_trace_id_hash
All semantic conventions set. No manual attribute code in the application.
Evidence from a live demo run
Here's actual output from a demo run (2026-02-07):
Jaeger trace (queried via Jaeger API):
{
"traceID": "429a7e3408f6408f259348df534db2fc",
"spanCount": 3,
"services": ["api", "worker-email"],
"spans": [
{
"operation": "checkout/start",
"kind": "internal",
"service": "api"
},
{
"operation": "domain-events publish",
"kind": "producer",
"attributes": {
"messaging.system": "kafka",
"messaging.destination.name": "domain-events",
"messaging.operation": "publish",
"messaging.kafka.message.key": "429a7e3408f6408f"
}
},
{
"operation": "domain-events process",
"kind": "consumer",
"attributes": {
"messaging.system": "kafka",
"messaging.destination.name": "domain-events",
"messaging.kafka.consumer.group": "demo-v1",
"messaging.kafka.partition": 2,
"messaging.kafka.offset": "0"
}
}
]
}
Same traceID across api and worker-email; PRODUCER and CONSUMER span kinds; all messaging semantic conventions set automatically by withProducerSpan / withConsumerSpan.