Branching and Merging
Prerequisites: Defining Pipelines, Custom Nodes
Pipelines are directed acyclic graphs, not just linear chains. NPipeline supports fan-out (one source, multiple downstream paths), side-channel observation (taps), and fan-in (multiple sources merging into one node).
Fan-Out with Connect
The simplest fan-out: call Connect from the same source handle to multiple targets:
public void Define(PipelineBuilder builder, PipelineContext context)
{
var orders = builder.AddSource<OrderSource, Order>("orders");
var analytics = builder.AddTransform<AnalyticsProcessor, Order, AnalyticsEvent>("analytics");
var fulfillment = builder.AddTransform<FulfillmentProcessor, Order, Shipment>("fulfillment");
var notifications = builder.AddTransform<NotificationProcessor, Order, Alert>("notify");
builder.Connect(orders, analytics);
builder.Connect(orders, fulfillment);
builder.Connect(orders, notifications);
}Each downstream node receives its own independent copy of the stream. Items flow to all branches concurrently.
🔀 Need conditional fan-out? Use Routing with RouteNode when items should be sent to specific named outputs based on predicates.
Tap Nodes
A tap sends each item to a side-channel sink without affecting the main flow. The item passes through unchanged:
var source = builder.AddSource<OrderSource, Order>("orders");
var auditTap = builder.AddTap<Order>(
() => new AuditSink(logger), "audit");
var transform = builder.AddTransform<ProcessOrder, Order, Result>("process");
builder.Connect(source, auditTap);
builder.Connect(auditTap, transform); // items continue downstreamTaps are useful for logging, metrics collection, or debugging without altering the pipeline's data flow.
You can chain multiple taps:
builder.Connect(source, auditTap);
builder.Connect(auditTap, metricsTap);
builder.Connect(metricsTap, alertTap);
builder.Connect(alertTap, mainTransform);Branch Nodes
A branch executes one or more side-effect handlers for each item, then passes the item through:
var branch = builder.AddBranch<Order>(async order =>
{
await SendNotificationAsync(order);
}, "notify-branch");
// Or with multiple handlers (all execute in parallel)
var branch = builder.AddBranch<Order>(new Func<Order, Task>[]
{
async order => await LogAsync(order),
async order => await NotifyAsync(order),
async order => await UpdateMetricsAsync(order),
}, "multi-branch");Error Handling in Branches
Configure how branch handler errors are handled:
| Mode | Behavior |
|---|---|
RouteToErrorHandler (default) | Errors go through the resilience policy |
CollectAndThrow | All errors collected and thrown as AggregateException |
LogAndContinue | Errors logged and swallowed |
Branch vs Tap
| Feature | Tap | Branch |
|---|---|---|
| Side channel | Full ISinkNode<T> | Func<T, Task> handlers |
| Error isolation | Sink manages its own errors | Configurable via BranchErrorHandlingMode |
| Use when | Sending to a persistent store | Lightweight side effects |
Fan-In: Merging Multiple Sources
Default Merge
When multiple sources connect to the same downstream node, items are interleaved in arrival order:
var nyse = builder.AddSource<NyseSource, Trade>("nyse");
var nasdaq = builder.AddSource<NasdaqSource, Trade>("nasdaq");
var processor = builder.AddTransform<TradeProcessor, Trade, ProcessedTrade>("process");
builder.Connect(nyse, processor);
builder.Connect(nasdaq, processor);All inbound streams must share a single runtime item type. When item-level lineage is enabled, the runtime item type is LineagePacket<T> - the merge operates on LineagePacket<Trade> streams and produces a merged LineagePacket<Trade> stream, preserving lineage context. No conversion or reflection is involved.
A mismatch between inbound stream types for a non-join node is a hard error.
Custom Merge
For control over how streams are combined, extend CustomMergeNode<T>:
public class PriorityMerge : CustomMergeNode<Trade>
{
public override async Task<IDataStream<Trade>> MergeAsync(
IEnumerable<IDataStream> pipes, CancellationToken ct)
{
// Custom merge logic - e.g., priority-based interleaving
var typedPipes = pipes.Cast<IDataStream<Trade>>();
return new DataStream<Trade>(
MergeByPriorityAsync(typedPipes, ct), "priority-merged");
}
}Lineage note: When item-level lineage is enabled, the streams passed to
MergeAsyncareIDataStream<LineagePacket<Trade>>, notIDataStream<Trade>. If you write a custom merge node that operates on lineage-enabled pipelines, cast toIDataStream<LineagePacket<Trade>>or use the non-genericpipesparameter and handle both cases. The simplest approach is to use the default interleave merge unless you have a specific ordering requirement.
Register the merge node as a preconfigured instance:
var mergeNode = new PriorityMerge();
var merge = builder.AddTransform<PriorityMerge, Trade, Trade>("merge");
builder.AddPreconfiguredNodeInstance("merge", mergeNode);
builder.Connect(nyse, merge);
builder.Connect(nasdaq, merge);📝 Note: For combining different types from two sources, use Joins and Lookups instead.
Next Steps
- Routing with RouteNode - conditional fan-out with named route outputs
- Joins and Lookups - combine data from different stream types
- Batching and Windowing - group items by count or time
- Pipeline Composition - embed sub-pipelines as reusable units
