Defining Pipelines
Prerequisites: Your First Pipeline, Key Concepts
Every NPipeline pipeline is a directed acyclic graph (DAG) of nodes connected by typed edges. You define the graph in a Define method, then hand it to a runner for execution.
The IPipelineDefinition Interface
The recommended approach is a class that implements IPipelineDefinition:
using NPipeline.Pipeline;
public class OrderPipeline : IPipelineDefinition
{
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<OrderSource, Order>("orders");
var validate = builder.AddTransform<ValidateOrder, Order, ValidatedOrder>("validate");
var sink = builder.AddSink<OrderSink, ValidatedOrder>("save");
builder.Connect(source, validate);
builder.Connect(validate, sink);
}
}Each Add* method returns a typed handle (SourceNodeHandle<T>, TransformNodeHandle<TIn, TOut>, SinkNodeHandle<TIn>). Handles carry type information so Connect can verify at compile time that the output type of one node matches the input type of the next.
Running a Pipeline
Use PipelineRunner to execute a definition:
var runner = PipelineRunner.Create();
await runner.RunAsync<OrderPipeline>();PipelineRunner.Create() builds a runner with default services. You can also pass a PipelineContext and cancellation token:
var context = PipelineContext.Default;
await runner.RunAsync<OrderPipeline>(context, cancellationToken);Or run a pre-instantiated definition (useful when the definition has constructor parameters):
var definition = new OrderPipeline(someConfig);
await runner.RunAsync(definition, context, cancellationToken);The PipelineBuilder API
Registering Nodes
| Method | Returns | Purpose |
|---|---|---|
AddSource<TNode, TOut>(name?) | SourceNodeHandle<TOut> | Register a source node |
AddTransform<TNode, TIn, TOut>(name?) | TransformNodeHandle<TIn, TOut> | Register an item-by-item transform |
AddStreamTransform<TNode, TIn, TOut>(name?) | TransformNodeHandle<TIn, TOut> | Register a stream-level transform |
AddSink<TNode, TIn>(name?) | SinkNodeHandle<TIn> | Register a sink node |
AddJoin<TNode, TIn1, TIn2, TOut>(name?) | JoinNodeHandle<TIn1, TIn2, TOut> | Register a join node |
AddAggregate<TNode, TIn, TKey, TResult>(name?) | AggregateNodeHandle<TIn, TResult> | Register an aggregate node |
Node names are optional but recommended - they appear in logs, metrics, and validation errors.
Connecting Nodes
builder.Connect(source, transform); // source output → transform input
builder.Connect(transform, sink); // transform output → sink inputConnect is type-safe. This won't compile:
var strings = builder.AddSource<StringSource, string>("strings");
var numbers = builder.AddTransform<DoubleIt, int, int>("double");
builder.Connect(strings, numbers); // Compile error: string ≠ intFan-out (One Source, Multiple Targets)
Call Connect multiple times from the same source:
builder.Connect(source, analytics);
builder.Connect(source, notifications);
builder.Connect(source, mainProcessor);Each downstream node receives its own copy of the stream.
Building Without Running
Use Build() to create a Pipeline object without executing it:
var pipeline = builder.Build(); // validates and returns PipelineFor validation without throwing, use TryBuild:
if (builder.TryBuild(out var pipeline, out var result))
{
// pipeline is valid
}
else
{
foreach (var error in result.Errors)
Console.WriteLine(error);
}Configuring Resilience in the Definition
The builder exposes configuration methods for error handling, retry, and circuit breakers directly in the definition:
public void Define(PipelineBuilder builder, PipelineContext context)
{
var source = builder.AddSource<OrderSource, Order>("orders");
var transform = builder.AddTransform<ProcessOrder, Order, Result>("process");
var sink = builder.AddSink<ResultSink, Result>("results");
builder.Connect(source, transform);
builder.Connect(transform, sink);
// Enable resilient execution on a specific node
transform.WithResilience(builder);
builder.WithRetryOptions(transform, new PipelineRetryOptions { MaxItemRetries = 3 });
// Pipeline-wide circuit breaker
builder.WithCircuitBreaker(failureThreshold: 10, openDuration: TimeSpan.FromSeconds(30));
}🔗 See also: Error Handling for the full resilience model.
Next Steps
- Lambda Nodes - define nodes inline without separate classes
- Custom Nodes - write your own source, transform, and sink nodes
- Pipeline Context - share state and configuration across nodes
- Dependency Injection - wire pipelines into a DI container
