Skip to content

OpenTelemetry

The NPipeline.Extensions.Observability.OpenTelemetry package provides distributed tracing for NPipeline using OpenTelemetry. It wraps System.Diagnostics.ActivitySource to emit traces that can be exported to Jaeger, Zipkin, Azure Monitor, AWS X-Ray, and any OpenTelemetry-compatible backend.

Installation

bash
dotnet add package NPipeline.Extensions.Observability.OpenTelemetry

Dependencies: OpenTelemetry 1.x, NPipeline.Extensions.Observability

Quick Start

csharp
// 1. Register the tracer
services.AddNPipelineObservability();
services.AddOpenTelemetryPipelineTracer("MyService");

// 2. Configure the OpenTelemetry SDK
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
    .AddNPipelineSource("MyService")
    .AddJaegerExporter()
    .Build();

Key Types

OpenTelemetryPipelineTracer

Implements IPipelineTracer. Creates Activity instances that map to OpenTelemetry spans.

csharp
var tracer = new OpenTelemetryPipelineTracer("MyService");

// Start a trace (automatically establishes parent-child relationships)
using var activity = tracer.StartActivity("ProcessOrders");
activity.SetTag("pipeline.batch_size", 1000);

Each pipeline run creates a root activity. Node executions create child activities, forming a trace tree.

PipelineActivity

Sealed wrapper around System.Diagnostics.Activity that implements IPipelineActivity:

  • SetTag(key, value) - add structured metadata
  • RecordException(exception) - record errors on the span

TracerProviderBuilder Extensions

csharp
// Register a single pipeline source
builder.AddNPipelineSource("MyService");

// Register multiple pipeline sources
builder.AddNPipelineSources("OrderService", "InventoryService", "ShippingService");

Activity Inspection

csharp
// Extract NPipeline metadata from an Activity
var info = activity.GetNPipelineInfo();
// Returns: NPipelineActivityInfo? { PipelineId, NodeId, ... }

Trace Structure

A typical trace tree:

MyService: OrderPipeline
├── csv-source (SourceNode)
├── transform (TransformNode)
│   ├── retry-1 (if retries occurred)
│   └── retry-2
└── db-sink (SinkNode)

Activity Hierarchy

LevelActivityTags
RootPipeline runpipeline.name, pipeline.id, pipeline.run_id
ChildNode executionnode.id, node.type, node.kind
GrandchildRetry attemptretry.attempt, retry.reason

Standard Tags

TagDescriptionExample
pipeline.namePipeline definition name"OrderPipeline"
pipeline.idPipeline type identifier"order-pipeline"
pipeline.run_idUnique run GUID"a1b2c3d4-..."
node.idNode identifier"csv-source"
node.typeNode CLR type name"CsvSourceNode"
node.kindNode kind (Source/Transform/Sink)"Transform"
node.items_processedItems consumed1000
node.items_emittedItems produced950
node.duration_msExecution time1234.5
otel.status_codeOK or ERROR"OK"

Exporter Examples

Jaeger

csharp
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
    .AddNPipelineSource("MyService")
    .AddJaegerExporter(o => o.AgentHost = "localhost")
    .Build();

Zipkin

csharp
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
    .AddNPipelineSource("MyService")
    .AddZipkinExporter(o => o.Endpoint = new Uri("http://localhost:9411/api/v2/spans"))
    .Build();

Azure Monitor / Application Insights

csharp
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
    .AddNPipelineSource("MyService")
    .AddAzureMonitorTraceExporter(o =>
        o.ConnectionString = "InstrumentationKey=...")
    .Build();

AWS X-Ray

csharp
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
    .AddNPipelineSource("MyService")
    .AddXRayTraceId()
    .AddOtlpExporter()
    .Build();

Console (Development)

csharp
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
    .AddNPipelineSource("MyService")
    .AddConsoleExporter()
    .Build();

Production Configuration

Sampling

In production, sample traces to reduce overhead:

csharp
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
    .AddNPipelineSource("MyService")
    .SetSampler(new TraceIdRatioBasedSampler(0.1)) // 10% of traces
    .AddOtlpExporter(o => o.Endpoint = new Uri("http://collector:4317"))
    .Build();

Batch Export

csharp
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
    .AddNPipelineSource("MyService")
    .AddOtlpExporter(o =>
    {
        o.Endpoint = new Uri("http://collector:4317");
        o.ExportProcessorType = ExportProcessorType.Batch;
        o.BatchExportProcessorOptions = new BatchExportProcessorOptions<Activity>
        {
            MaxQueueSize = 2048,
            ScheduledDelayMilliseconds = 5000,
            MaxExportBatchSize = 512
        };
    })
    .Build();

Multi-Service Setup

When multiple services each run NPipeline, register separate sources per service:

csharp
// Service A
services.AddOpenTelemetryPipelineTracer("OrderService");

// Service B
services.AddOpenTelemetryPipelineTracer("InventoryService");

Then register all sources in the SDK:

csharp
builder.AddNPipelineSources("OrderService", "InventoryService");

Parent-child relationships are preserved across service boundaries via W3C trace context propagation.

Troubleshooting

IssueResolution
No traces appearingVerify service name matches between AddOpenTelemetryPipelineTracer and AddNPipelineSource
Missing node spansEnsure AddNPipelineObservability() is registered before the tracer
Exporter connection errorsCheck endpoint URL and network connectivity
Too many traces in productionAdd SetSampler with TraceIdRatioBasedSampler
Large trace payloadsUse batch export with appropriate queue/batch sizes
Activities not correlatedVerify Activity.Current is not null - avoid Task.Run without flow

Debug Logging

Enable OpenTelemetry internal logging to diagnose issues:

csharp
OpenTelemetrySdk.SetDefaultTextMapPropagator(new CompositeTextMapPropagator(new[]
{
    new TraceContextPropagator(),
    new BaggagePropagator()
}));

// Enable self-diagnostics
Environment.SetEnvironmentVariable("OTEL_DIAGNOSTICS_ENABLED", "true");

See Also

Released under the MIT License.