Architecture Overview
This page is a map of NPipeline's internals. Use it to orient yourself before diving into specific subsystems.
Component Diagram
flowchart TB
subgraph User Code
PD[IPipelineDefinition]
end
subgraph Pipeline Construction
PB[PipelineBuilder]
PG[PipelineGraph]
ND[NodeDefinition]
end
subgraph Execution
PR[PipelineRunner]
PEO[PipelineExecutionOrchestrator]
NF[INodeFactory]
NE[INodeExecutor]
TS[ITopologyService]
ES[IExecutionStrategy]
end
subgraph Data Flow
DS[IDataStream<T>]
SN[ISourceNode<T>]
TN[ITransformNode<TIn,TOut>]
SK[ISinkNode<T>]
end
subgraph Cross-Cutting
PC[PipelineContext]
RP[IResiliencePolicy]
OB[IObservabilitySurface]
LN[ILineage]
end
PD -->|Define| PB
PB -->|Build| PG
PG -->|contains| ND
PR -->|orchestrates| PEO
PEO -->|uses| NF
PEO -->|uses| NE
PEO -->|uses| TS
NE -->|delegates to| ES
ES -->|processes| DS
SN -->|produces| DS
TN -->|transforms| DS
SK -->|consumes| DS
PC -.->|flows through| SN
PC -.->|flows through| TN
PC -.->|flows through| SK
RP -.->|consulted by| ES
OB -.->|observes| PEO
LN -.->|records| PEOMajor Subsystems
Pipeline Construction (NPipeline.Pipeline, NPipeline.Graph)
Users define pipelines by implementing IPipelineDefinition.Define(PipelineBuilder, PipelineContext). The builder collects node registrations and connections, then Build() produces a PipelineGraph - an immutable directed acyclic graph of NodeDefinition records connected by typed Edge objects.
Key types:
PipelineBuilder- fluent API (split across partial classes:.Build.cs,.Configuration.cs, etc.)PipelineGraph/PipelineGraphBuilder- immutable graph with validationNodeDefinition- record holding node metadata (ID, type, kind, input/output types, execution strategy, merge strategy, lineage adapters)NodeKind- enum: Source, Transform, StreamTransform, Tap, Branch, Lookup, Batch, Sink, Join, Aggregate, Composite, CompositeInput, CompositeOutput
Execution (NPipeline.Execution)
PipelineRunner is the main entry point. It delegates to PipelineExecutionOrchestrator, which coordinates the full execution lifecycle:
- Binding -
IRuntimePipelineBindernormalizes the graph before execution: applies option overrides and writes aRuntimeNodeStreamContractfor every node into the execution annotation bag - Setup - instantiate nodes via
INodeFactory, resolve execution plans - Topology -
ITopologyServicecomputes topological order from the graph - Node execution -
INodeExecutorexecutes each node in order, usingIExecutionStrategyfor transforms; before passing input streams to a non-join node, it validates stream item types against the node'sRuntimeNodeStreamContract - Lineage -
ILineagerecords data provenance if enabled - Cleanup - dispose nodes, streams, and context resources
Key types:
PipelineRunner/PipelineRunnerBuilder- public entry pointsPipelineExecutionOrchestrator- internal orchestrationIRuntimePipelineBinder/RuntimePipelineBinder- bind-time graph normalization and contract annotationRuntimeNodeStreamContract- per-node record of effective input/output item types and lineage stateExecutionAnnotationKeys- registry of annotation bag keys used across the execution layerIExecutionStrategy- controls how a transform processes its input streamNodeExecutionPlan- pre-built execution plan for optimized dispatch
Data Flow (NPipeline.DataFlow)
Data flows between nodes as IDataStream<T>, which extends IAsyncEnumerable<T>. Streams are lazy by default - items are pulled on demand.
Key types:
IDataStream<T>- typed async streamIForwardOnlyDataStream- marker for streams that cannot be replayedInMemoryDataStream<T>- buffered collectionDataStream<T>- wrapsIAsyncEnumerable<T>CappedReplayableDataStream<T>- bounded replay buffer for materialization
Nodes (NPipeline.Nodes)
All nodes implement INode (marker interface extending IAsyncDisposable). The three primary interfaces:
ISourceNode<TOut>-OpenStream(context, ct)→IDataStream<TOut>ITransformNode<TIn, TOut>-TransformAsync(item, context, ct)→Task<TOut>ISinkNode<TIn>-ConsumeAsync(input, context, ct)→Task
Additional interfaces: IStreamTransformNode<TIn, TOut>, IAggregateNode, IJoinNode, IBranchNode, ILookupNode, IBatchNode, ICompositeNode.
Resilience (NPipeline.Resilience, NPipeline.ErrorHandling)
IResiliencePolicy makes all failure decisions. ResilientExecutionStrategy wraps a base strategy with retry, circuit breaker, and dead-letter support. ResiliencePolicyBuilder provides the fluent API.
Configuration (NPipeline.Configuration)
Immutable record types: PipelineRetryOptions, PipelineCircuitBreakerOptions, ErrorHandlingConfiguration, LineageOptions, AggregateNodeConfiguration<T>. All use with expressions for modification.
Observability (NPipeline.Observability)
IObservabilitySurface receives lifecycle events (pipeline started/completed, node started/completed). Implementations push to logging, metrics, and tracing systems.
Lineage (NPipeline.Lineage)
ILineage builds lineage adapters at construction time and records provenance events during execution. Configurable via LineageOptions.
Dependency Direction
Dependencies flow inward. The core NPipeline package has no dependency on extension packages. Extensions depend on the core.
Extensions → Core ← Connectors
↑
Storage ProvidersThe core is self-contained: pipeline construction, execution, data flow, resilience, and configuration. Extensions add DI, parallelism, composition, testing, observability, and lineage.
Next Steps
- Design Principles - why the architecture is shaped this way
- Execution Model - deep dive into how the runner works
- Data Flow Internals - stream and pipe implementation details
