Message Isolation? Autotel Makes Tenant Context Flow
17 Jan 2026The Signadot team is spot on in their Testing Event-Driven Architectures with OpenTelemetry post.
Message isolation using a shared queue: propagate tenant ID in Kafka message headers; consumers use tenant ID for selective message consumption.
They make the case that infrastructure duplication is expensive. Instead of separate Kafka clusters per environment, use tenant ID filtering on a shared queue. Instrument producers and consumers for context propagation.
We've all been there: maintaining four "identical" Kafka setups that slowly drift apart.
Their key insight:
Requires modifying consumers and using OpenTelemetry for context propagation.
But there's still a gap...
I built Autotel to close that gap.
Autotel does not invent a new isolation mechanism; it standardizes how existing OpenTelemetry context propagation is applied consistently across services.
Autotel makes message isolation practical. One ID for selective consumption AND observability. No per-service boilerplate for context propagation.
The Problem You're Solving
You want to test async workflows without duplicating infrastructure. The Signadot article describes the approach:
- Tag messages with tenant ID at the producer
- Propagate context through message headers
- Consumers filter by tenant ID (selective consumption)
- Only process messages for your tenant/environment
The manual implementation looks like this:
// Manual approach: producer adds tenant context
async function publish(topic: string, payload: any, tenantId: string) {
const headers: Record<string, string> = {
'x-tenant-id': tenantId,
};
// Must inject trace context manually
propagation.inject(context.active(), headers, {
set: (carrier, key, value) => { carrier[key] = value; }
});
await producer.send({
topic,
messages: [{ value: JSON.stringify(payload), headers }],
});
}
// Consumer: extract and filter
async function consume(msg: KafkaMessage) {
const headers = normalizeHeaders(msg.headers);
const tenantId = headers['x-tenant-id'];
// Filter: skip if not my tenant
if (tenantId !== process.env.MY_TENANT) {
return;
}
// Extract trace context manually
const parentContext = propagation.extract(context.active(), headers, {
get: (carrier, key) => carrier[key]?.toString()
});
await context.with(parentContext, async () => {
await processMessage(msg);
});
}
It works, but it's a lot of ceremony for something we all want to be the default.
The Solution: Automatic Context Propagation
With Autotel, context propagation is automatic:
// Kafka consumer with context mode toggle
import { withConsumerSpan, normalizeHeaders, HEADER_CORRELATION_ID } from "@demo/observability/kafka";
for await (const msg of stream) {
const headers = normalizeHeaders(msg.headers);
const messageMeta = { partition: msg.partition, offset: String(msg.offset) };
// Selective consumption: filter by correlation_id or tenant
const correlationId = headers[HEADER_CORRELATION_ID];
if (shouldFilter(correlationId)) {
continue; // Skip, not my tenant/request
}
if (process.env.DEMO_PROPAGATE_TRACE) {
// Context mode: inherit trace from producer
await withConsumerSpan(topic, consumerGroup, headers, messageMeta, async () => {
await trace("worker-email.handle", async () => {
await handleOrderCreated(msg);
});
});
} else {
// Context mode: none (new trace per message)
await context.with(context.active(), async () => {
await trace("worker-email.handle", async () => {
await handleOrderCreated(msg);
});
});
}
}
One ID for Two Purposes
The correlation_id serves dual purposes:
- Selective consumption: "Should I process this message?"
- Observability join key: "Show me everything for this request"
// At the API boundary
const correlationId = getOrCreateCorrelationId();
// In the message
const payload = {
event: "v1.orders.created",
correlation_id: correlationId,
data: { order_id: "order-123" },
};
// In the headers
const headers = injectTraceHeaders({
[HEADER_CORRELATION_ID]: correlationId,
"x-event-name": "v1.orders.created",
});
Now consumers can filter. Selective consumption should still acknowledge or forward skipped messages according to your retry/DLQ strategy.
function shouldFilter(correlationId: string | undefined): boolean {
if (!process.env.ISOLATION_TENANT) return false;
// correlation_id can encode tenant directly, or reference it indirectly
return !correlationId?.startsWith(process.env.ISOLATION_TENANT);
}
And observability tools can join:
-- All traces for this request
SELECT * FROM traces WHERE tags['correlation_id'] = 'tenant-123-checkout-abc';
-- All events for this request
SELECT * FROM events WHERE correlation_id = 'tenant-123-checkout-abc';
-- All logs for this request
SELECT * FROM logs WHERE correlation_id = 'tenant-123-checkout-abc';
Manual Message Isolation vs Autotel
| Manual Message Isolation | Autotel |
|---|---|
Manual propagation.inject() |
injectTraceHeaders() |
Manual propagation.extract() |
Automatic in withConsumerSpan() |
| Separate tenant ID and trace context | correlation_id serves both |
| Per-service boilerplate | Thin wrappers over autotel-plugins |
| Easy to forget context in one service | Context flows automatically |
Try It Yourself
Clone the demo and see message isolation in action:
git clone https://github.com/jagreehal/event-driven-observability-demo
cd event-driven-observability-demo
pnpm install
docker compose up -d # Kafka, Jaeger
pnpm build && pnpm start
Trigger a checkout:
curl -sX POST http://localhost:3000/rpc/checkout/start \
-H "content-type: application/json" \
-d '{"cartValue":149.99,"itemCount":2,"userId":"usr_123","tenantId":"acme"}' | jq
Watch worker logs; you'll see correlation_id flowing through every service. Open Jaeger at http://localhost:16686 and search by that ID — all traces, all services, one query.
Evidence from a live demo run
Here's actual output from a demo run (2026-02-07):
API response:
{ "ok": true, "correlation_id": "429a7e3408f6408f", "published": "v1.orders.created" }
Jaeger trace (one trace across API and worker):
{
"traceID": "429a7e3408f6408f259348df534db2fc",
"services": ["api", "worker-email"],
"spans": [
{ "operation": "checkout/start", "service": "api", "kind": "internal" },
{ "operation": "domain-events publish", "service": "api", "kind": "producer" },
{ "operation": "domain-events process", "service": "worker-email", "kind": "consumer" }
]
}
Event with correlation_id (from analytics sink):
{
"name": "v1.checkouts.started",
"autotel": {
"correlation_id": "429a7e3408f6408f",
"trace_id": "429a7e3408f6408f259348df534db2fc",
"trace_url": "http://localhost:16686/trace/429a7e3408f6408f259348df534db2fc"
}
}
Same correlation_id across API response, Kafka message key, and event payload. One ID for isolation and observability.