Arrange Act Assert

Jag Reehals thinking on things, mostly product development

Request-Level Isolation? Autotel Propagates Context Automatically

16 Jan 2026

The CNCF team is spot on in their Testing Asynchronous Workflows using OpenTelemetry and Istio post.

Request-level isolation is the most cost-effective approach.

They make the case against duplicating infrastructure for testing. Instead of spinning up separate Kafka clusters per tenant, use OpenTelemetry Baggage to propagate tenant ID through async flows. Consumers filter by tenant ID. Istio handles routing.

We've all been there: every team has their own "staging Kafka" and costs balloon.

Their key insight:

Use OpenTelemetry Baggage to propagate tenant ID through sync and async. When publishing to Kafka, producers inject trace context (including baggage) into message headers; consumers extract and make routing decisions.

But there's still a gap...

I built Autotel to close that gap.

Autotel makes context propagation the default. No manual inject/extract into message headers. No per-service boilerplate. No forgetting to include tenant ID in product events.

The Problem You're Solving

You want request-level isolation for testing. The CNCF article describes the approach:

  1. Set tenant ID in OpenTelemetry Baggage at the edge
  2. Propagate baggage through HTTP (Istio handles it)
  3. Inject baggage into Kafka message headers on publish
  4. Extract baggage on consume, filter by tenant ID
  5. Enrich product events with tenant context

The manual implementation:

// Manual approach: producer injects baggage into headers
import { propagation, context, baggage } from '@opentelemetry/api';

async function publishEvent(topic: string, payload: any) {
  const currentBaggage = propagation.getBaggage(context.active());
  const tenantId = currentBaggage?.getEntry('tenant.id')?.value;

  const headers: Record<string, string> = {};

  // Manual inject - easy to forget
  propagation.inject(context.active(), headers, {
    set: (carrier, key, value) => { carrier[key] = value; }
  });

  // Must remember to include baggage keys explicitly
  if (tenantId) {
    headers['x-tenant-id'] = tenantId;
  }

  await producer.send({
    topic,
    messages: [{ value: JSON.stringify(payload), headers }],
  });
}

// Consumer extracts and filters
async function consume(msg: KafkaMessage) {
  const parentContext = propagation.extract(context.active(), msg.headers, {
    get: (carrier, key) => carrier[key]?.toString()
  });

  const tenantId = msg.headers['x-tenant-id']?.toString();

  // Filter for selective consumption
  if (tenantId !== expectedTenantId) {
    return; // Skip messages for other tenants
  }

  await context.with(parentContext, async () => {
    await processMessage(msg);
  });
}

// And every track() call needs tenant context
track('order.created', {
  tenant_id: tenantId,  // Easy to forget
  order_id: order.id,
});

It works, but it's a lot of ceremony for something we all want to be the default.

The Solution: Context Flows Automatically

With Autotel, injectTraceHeaders() propagates baggage automatically:

// Producer: API sends to Kafka
import { injectTraceHeaders, withProducerSpan } from "@demo/observability/kafka";

function buildHeaders(correlationId: string, eventName: string) {
  return injectTraceHeaders({
    "x-correlation-id": correlationId,
    "x-event-name": eventName,
  });
  // Returns: traceparent, tracestate, x-correlation-id, x-event-name
  // Baggage entries are propagated via tracestate
}

async function sendEvent(payload: Envelope, headers: Record<string, string>) {
  await withProducerSpan("orders", { messageKey: payload.correlation_id }, async () => {
    const h = injectTraceHeaders(headers);
    await producer.send({
      messages: [{
        topic: "orders",
        key: payload.correlation_id,
        value: JSON.stringify(payload),
        headers: h,
      }],
    });
  });
}

Autotel still supports OpenTelemetry Baggage; correlation_id is used as a stable, queryable join key that complements baggage rather than replacing it.

The correlation_id serves as the tenant/request identifier. Use it for:

Prefix-based filtering is one example; correlation_id can also reference tenant via lookup or structured encoding.

Enriching Product Events from Baggage

Configure enrichFromBaggage once at init:

import { init, track } from "autotel";

init({
  service: "api",
  events: {
    includeTraceContext: true,
    traceUrl: (ctx) => `http://localhost:16686/trace/${ctx.traceId}`,
    enrichFromBaggage: {
      allow: ["tenant.id", "user.id", "request.id"],
      deny: ["user.email", "user.ssn"],  // Never include PII
      prefix: "ctx.",
      transform: { "user.id": "hash", "tenant.id": "plain" },
    },
  },
});

// Now every track() call includes baggage context automatically
track("order.created", { order_id: order.id });
// Emitted event includes: ctx.tenant.id, ctx.user.id (hashed), ctx.request.id

No passing tenant ID into every track() call. No forgetting to include context. The event schema stays clean, and observability context is always present.

Manual Baggage Propagation vs Autotel

Manual Baggage Propagation Autotel
propagation.inject() into headers injectTraceHeaders()
propagation.extract() from headers Automatic in withConsumerSpan()
Manually include baggage keys in headers Baggage propagated via tracestate
Pass tenant ID to every track() call enrichFromBaggage config
Different code for HTTP vs Kafka Same pattern everywhere
Easy to forget context in one service Context flows automatically

One ID for Isolation and Observability

The correlation_id is your stable join key:

// At the edge (API gateway or first service)
const correlationId = getOrCreateCorrelationId();  // From autotel

// In message payload and headers
const payload = {
  event: "v1.checkout.started",
  correlation_id: correlationId,  // Same ID everywhere
  data: { order_id: "order-123" },
};

const headers = injectTraceHeaders({
  "x-correlation-id": correlationId,
});

Now you can:

-- Find all traces for a request
SELECT * FROM traces WHERE correlation_id = 'abc-123';

-- Find all events for a request
SELECT * FROM events WHERE correlation_id = 'abc-123';

-- Find all logs for a request
SELECT * FROM logs WHERE correlation_id = 'abc-123';

And for testing with request-level isolation:

// Consumer filters by correlation_id prefix or tenant ID
const headers = normalizeHeaders(msg.headers);
const tenantId = headers["x-tenant-id"];

if (process.env.ISOLATION_MODE && tenantId !== process.env.MY_TENANT) {
  // Skipped messages should still be acknowledged or forwarded
  // according to your retry/DLQ strategy to avoid backlogs
  return;
}

await withConsumerSpan(topic, consumerGroup, headers, messageMeta, async () => {
  await processMessage(msg);
});

Try It Yourself

Clone the demo and see context propagation in action:

git clone https://github.com/jagreehal/event-driven-observability-demo
cd event-driven-observability-demo
pnpm install
docker compose up -d  # Kafka, Jaeger
pnpm build && pnpm start

Trigger a checkout:

curl -sX POST http://localhost:3000/rpc/checkout/start \
  -H "content-type: application/json" \
  -d '{"cartValue":149.99,"itemCount":2,"userId":"usr_123","tenantId":"acme"}' | jq

Check the worker logs; you'll see correlation_id flowing through every service. Open Jaeger at http://localhost:16686 and search by correlation_id. All traces, all services, one query.

Evidence from a live demo run

Here's actual output from a demo run (2026-02-07):

API response (stable correlation_id returned immediately):

{ "ok": true, "correlation_id": "429a7e3408f6408f", "published": "v1.orders.created" }

Event with autotel enrichment (from analytics sink at /events):

{
  "name": "v1.checkouts.started",
  "autotel": {
    "correlation_id": "429a7e3408f6408f",
    "trace_id": "429a7e3408f6408f259348df534db2fc",
    "span_id": "557a33e70d8d3589",
    "trace_flags": "01",
    "trace_url": "http://localhost:16686/trace/429a7e3408f6408f259348df534db2fc"
  }
}

Same correlation_id in the API response and in every event; trace_url links directly to the Jaeger trace. One ID joins traces, events, and logs.

Repo: github.com/jagreehal/event-driven-observability-demo

testing autotel opentelemetry observability-series