Skip to content

Pipeline Context

Prerequisites: Defining Pipelines

PipelineContext is the shared object that every node receives during execution. It carries runtime parameters, shared state, cancellation tokens, and framework services.

Creating a Context

csharp
// Default context with no configuration
var context = PipelineContext.Default;

// Context with parameters
var context = new PipelineContext(
    PipelineContextConfiguration.WithParameters(new Dictionary<string, object>
    {
        ["region"] = "us-east-1",
        ["batchDate"] = DateTime.Today
    }));

// Context with cancellation
var context = new PipelineContext(
    PipelineContextConfiguration.WithCancellation(cancellationToken));

Pass the context when running:

csharp
await runner.RunAsync<MyPipeline>(context, cancellationToken);

Three Dictionaries

PipelineContext exposes three IDictionary<string, object> collections with different purposes:

DictionaryPurposeSet ByRead By
ParametersRuntime inputs (file paths, dates, config values)Caller before executionNodes during execution
ItemsNode-to-node shared stateAny node during executionAny downstream node
PropertiesExtension/plugin storageExtensions and frameworkExtensions and framework

Using Parameters

Set parameters before running, read them in nodes:

csharp
// Set before execution
context.Parameters["inputPath"] = "/data/orders.csv";

// Read in a node
public override IDataStream<Order> OpenStream(
    PipelineContext context, CancellationToken ct)
{
    var path = (string)context.Parameters["inputPath"];
    return new DataStream<Order>(ReadCsvAsync(path, ct), "orders");
}

Sharing State Between Nodes

Use Items for node-to-node communication:

csharp
// In a transform node: store a computed value
public override Task<Order> TransformAsync(
    Order item, PipelineContext context, CancellationToken ct)
{
    var count = context.Items.TryGetValue("orderCount", out var c) ? (int)c : 0;
    context.Items["orderCount"] = count + 1;
    return Task.FromResult(item);
}

// In a later sink: read the value
public override async Task ConsumeAsync(
    IDataStream<Order> input, PipelineContext context, CancellationToken ct)
{
    await foreach (var order in input.WithCancellation(ct)) { /* ... */ }
    var total = (int)context.Items["orderCount"];
    Console.WriteLine($"Processed {total} orders");
}

Thread Safety: In the Default optimization profile, Parameters, Items, and Properties are backed by ConcurrentDictionary and support concurrent reads and writes. In HighThroughput mode, they are plain Dictionary instances with zero locking overhead but no thread safety. For complex shared state in parallel execution, use IPipelineStateManager.

Accessing Framework Services

PipelineContext also exposes framework services for observability, error handling, and lineage:

PropertyTypeDescription
CancellationTokenCancellationTokenPipeline-wide cancellation
PipelineIdGuidUnique ID for this pipeline definition
RunIdGuidUnique ID for this execution run
PipelineNamestring?Human-readable name
PipelineStartTimeUtcDateTimeWhen execution started
LoggerFactoryILoggerFactoryFor creating loggers in nodes
DeadLetterSinkIDeadLetterSink?For routing failed items
GlobalRetryOptionsPipelineRetryOptionsPipeline-wide retry configuration

Configuring the Context

PipelineContextConfiguration provides factory methods for common setups:

csharp
// Combine multiple configurations
var config = new PipelineContextConfiguration(
    Parameters: new Dictionary<string, object> { ["key"] = "value" },
    LoggerFactory: loggerFactory,
    CancellationToken: cancellationToken);

var context = new PipelineContext(config);

Available factory methods:

MethodPurpose
WithParameters(dict)Set runtime parameters
WithCancellation(token)Set cancellation token
WithLogging(loggerFactory)Configure logging
WithRetry(retryOptions)Set retry configuration
WithResilience(policy)Set resilience policy
WithErrorHandling(deadLetterSink?)Configure error handling
WithObservability(loggerFactory?, tracer?)Configure observability

Next Steps

Released under the MIT License.