Arrange Act Assert

Jag Reehals thinking on things, mostly product development

Message Isolation? Autotel Makes Tenant Context Flow

17 Jan 2026

The Signadot team is spot on in their Testing Event-Driven Architectures with OpenTelemetry post.

Message isolation using a shared queue: propagate tenant ID in Kafka message headers; consumers use tenant ID for selective message consumption.

They make the case that infrastructure duplication is expensive. Instead of separate Kafka clusters per environment, use tenant ID filtering on a shared queue. Instrument producers and consumers for context propagation.

We've all been there: maintaining four "identical" Kafka setups that slowly drift apart.

Their key insight:

Requires modifying consumers and using OpenTelemetry for context propagation.

But there's still a gap...

I built Autotel to close that gap.

Autotel does not invent a new isolation mechanism; it standardizes how existing OpenTelemetry context propagation is applied consistently across services.

Autotel makes message isolation practical. One ID for selective consumption AND observability. No per-service boilerplate for context propagation.

The Problem You're Solving

You want to test async workflows without duplicating infrastructure. The Signadot article describes the approach:

  1. Tag messages with tenant ID at the producer
  2. Propagate context through message headers
  3. Consumers filter by tenant ID (selective consumption)
  4. Only process messages for your tenant/environment

The manual implementation looks like this:

// Manual approach: producer adds tenant context
async function publish(topic: string, payload: any, tenantId: string) {
  const headers: Record<string, string> = {
    'x-tenant-id': tenantId,
  };

  // Must inject trace context manually
  propagation.inject(context.active(), headers, {
    set: (carrier, key, value) => { carrier[key] = value; }
  });

  await producer.send({
    topic,
    messages: [{ value: JSON.stringify(payload), headers }],
  });
}

// Consumer: extract and filter
async function consume(msg: KafkaMessage) {
  const headers = normalizeHeaders(msg.headers);
  const tenantId = headers['x-tenant-id'];

  // Filter: skip if not my tenant
  if (tenantId !== process.env.MY_TENANT) {
    return;
  }

  // Extract trace context manually
  const parentContext = propagation.extract(context.active(), headers, {
    get: (carrier, key) => carrier[key]?.toString()
  });

  await context.with(parentContext, async () => {
    await processMessage(msg);
  });
}

It works, but it's a lot of ceremony for something we all want to be the default.

The Solution: Automatic Context Propagation

With Autotel, context propagation is automatic:

// Kafka consumer with context mode toggle
import { withConsumerSpan, normalizeHeaders, HEADER_CORRELATION_ID } from "@demo/observability/kafka";

for await (const msg of stream) {
  const headers = normalizeHeaders(msg.headers);
  const messageMeta = { partition: msg.partition, offset: String(msg.offset) };

  // Selective consumption: filter by correlation_id or tenant
  const correlationId = headers[HEADER_CORRELATION_ID];
  if (shouldFilter(correlationId)) {
    continue;  // Skip, not my tenant/request
  }

  if (process.env.DEMO_PROPAGATE_TRACE) {
    // Context mode: inherit trace from producer
    await withConsumerSpan(topic, consumerGroup, headers, messageMeta, async () => {
      await trace("worker-email.handle", async () => {
        await handleOrderCreated(msg);
      });
    });
  } else {
    // Context mode: none (new trace per message)
    await context.with(context.active(), async () => {
      await trace("worker-email.handle", async () => {
        await handleOrderCreated(msg);
      });
    });
  }
}

One ID for Two Purposes

The correlation_id serves dual purposes:

  1. Selective consumption: "Should I process this message?"
  2. Observability join key: "Show me everything for this request"
// At the API boundary
const correlationId = getOrCreateCorrelationId();

// In the message
const payload = {
  event: "v1.orders.created",
  correlation_id: correlationId,
  data: { order_id: "order-123" },
};

// In the headers
const headers = injectTraceHeaders({
  [HEADER_CORRELATION_ID]: correlationId,
  "x-event-name": "v1.orders.created",
});

Now consumers can filter. Selective consumption should still acknowledge or forward skipped messages according to your retry/DLQ strategy.

function shouldFilter(correlationId: string | undefined): boolean {
  if (!process.env.ISOLATION_TENANT) return false;

  // correlation_id can encode tenant directly, or reference it indirectly
  return !correlationId?.startsWith(process.env.ISOLATION_TENANT);
}

And observability tools can join:

-- All traces for this request
SELECT * FROM traces WHERE tags['correlation_id'] = 'tenant-123-checkout-abc';

-- All events for this request
SELECT * FROM events WHERE correlation_id = 'tenant-123-checkout-abc';

-- All logs for this request
SELECT * FROM logs WHERE correlation_id = 'tenant-123-checkout-abc';

Manual Message Isolation vs Autotel

Manual Message Isolation Autotel
Manual propagation.inject() injectTraceHeaders()
Manual propagation.extract() Automatic in withConsumerSpan()
Separate tenant ID and trace context correlation_id serves both
Per-service boilerplate Thin wrappers over autotel-plugins
Easy to forget context in one service Context flows automatically

Try It Yourself

Clone the demo and see message isolation in action:

git clone https://github.com/jagreehal/event-driven-observability-demo
cd event-driven-observability-demo
pnpm install
docker compose up -d  # Kafka, Jaeger
pnpm build && pnpm start

Trigger a checkout:

curl -sX POST http://localhost:3000/rpc/checkout/start \
  -H "content-type: application/json" \
  -d '{"cartValue":149.99,"itemCount":2,"userId":"usr_123","tenantId":"acme"}' | jq

Watch worker logs; you'll see correlation_id flowing through every service. Open Jaeger at http://localhost:16686 and search by that ID — all traces, all services, one query.

Evidence from a live demo run

Here's actual output from a demo run (2026-02-07):

API response:

{ "ok": true, "correlation_id": "429a7e3408f6408f", "published": "v1.orders.created" }

Jaeger trace (one trace across API and worker):

{
  "traceID": "429a7e3408f6408f259348df534db2fc",
  "services": ["api", "worker-email"],
  "spans": [
    { "operation": "checkout/start", "service": "api", "kind": "internal" },
    { "operation": "domain-events publish", "service": "api", "kind": "producer" },
    { "operation": "domain-events process", "service": "worker-email", "kind": "consumer" }
  ]
}

Event with correlation_id (from analytics sink):

{
  "name": "v1.checkouts.started",
  "autotel": {
    "correlation_id": "429a7e3408f6408f",
    "trace_id": "429a7e3408f6408f259348df534db2fc",
    "trace_url": "http://localhost:16686/trace/429a7e3408f6408f259348df534db2fc"
  }
}

Same correlation_id across API response, Kafka message key, and event payload. One ID for isolation and observability.

Repo: github.com/jagreehal/event-driven-observability-demo

testing autotel opentelemetry kafka observability-series