Dependency Injection
Prerequisites: Defining Pipelines
The NPipeline.Extensions.DependencyInjection package integrates NPipeline with Microsoft.Extensions.DependencyInjection. It handles node resolution, service lifetimes, and pipeline execution through the DI container.
Installation
dotnet add package NPipeline.Extensions.DependencyInjectionQuick Setup
The simplest approach scans assemblies for all NPipeline types:
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
services.AddNPipeline(Assembly.GetExecutingAssembly());
})
.Build();
await host.Services.RunPipelineAsync<OrderPipeline>();AddNPipeline(Assembly[]) auto-discovers and registers:
- All
INodeimplementations - All
IPipelineDefinitionimplementations - All
IResiliencePolicyimplementations - All
IDeadLetterSinkimplementations - All lineage sink implementations
Manual Registration
For explicit control, use the builder API:
services.AddNPipeline(builder =>
{
builder.AddNode<OrderSource>();
builder.AddNode<ValidateOrder>();
builder.AddNode<OrderSink>();
builder.AddPipeline<OrderPipeline>();
builder.AddResiliencePolicy<RetryOnTransientErrors>();
builder.AddDeadLetterSink<FileDeadLetterSink>();
});Mix both approaches - scan assemblies, then add individual registrations:
services.AddNPipeline(builder =>
{
builder.ScanAssemblies(Assembly.GetExecutingAssembly());
builder.AddNode<ExternalNode>(ServiceLifetime.Singleton);
});Service Lifetimes
All types default to Transient registration. Override the lifetime per type:
builder.AddNode<ExpensiveNode>(ServiceLifetime.Singleton);
builder.AddNode<CachedLookup>(ServiceLifetime.Scoped);| Lifetime | When to Use |
|---|---|
| Transient (default) | Stateless nodes, new instance per use |
| Scoped | Nodes that share state within a pipeline run |
| Singleton | Thread-safe nodes with expensive initialization |
⚠️ Warning: Singleton nodes must be thread-safe. If your node holds mutable state, use
TransientorScoped.
Running Pipelines
From IServiceProvider
await host.Services.RunPipelineAsync<MyPipeline>();
// With parameters
await host.Services.RunPipelineAsync<MyPipeline>(
new Dictionary<string, object> { ["date"] = DateTime.Today });RunPipelineAsync creates a DI scope, resolves the runner and all dependencies, sets context.DiOwnedNodes = true to prevent double-disposal, and executes the pipeline.
From an Injected Runner
Inject IPipelineRunner into your own services:
public class OrderService(IPipelineRunner runner)
{
public async Task ProcessOrdersAsync(CancellationToken ct)
{
var context = PipelineContext.Default;
await runner.RunAsync<OrderPipeline>(context, ct);
}
}Constructor Injection in Nodes
When nodes are resolved through DI, they can take constructor dependencies:
public class EnrichOrder : TransformNode<Order, EnrichedOrder>
{
private readonly IHttpClientFactory _httpFactory;
private readonly ILogger<EnrichOrder> _logger;
public EnrichOrder(IHttpClientFactory httpFactory, ILogger<EnrichOrder> logger)
{
_httpFactory = httpFactory;
_logger = logger;
}
public override async Task<EnrichedOrder> TransformAsync(
Order item, PipelineContext context, CancellationToken ct)
{
var client = _httpFactory.CreateClient();
_logger.LogInformation("Enriching order {OrderId}", item.Id);
var details = await client.GetFromJsonAsync<Details>($"/orders/{item.Id}", ct);
return new EnrichedOrder(item, details!);
}
}📝 Note: Without DI, nodes must have a parameterless constructor. The
NodeParameterlessConstructorAnalyzer(NP9403) warns about this at build time.
What Gets Registered
AddNPipeline registers these core services automatically:
| Service | Lifetime |
|---|---|
IPipelineRunner | Scoped |
PipelineBuilder | Transient |
IPipelineFactory | Singleton |
INodeFactory | Scoped |
INodeExecutor | Scoped |
ITopologyService | Scoped |
IErrorHandlingService | Transient |
Next Steps
- DI Extension Reference - registered services, node factory, and registry details
- Pipeline Context - pass runtime parameters and state
- Defining Pipelines - the builder API
- Parallel Execution - thread safety with DI-resolved nodes
