Skip to content

Parallelism

The NPipeline.Extensions.Parallelism package provides parallel execution strategies for transform nodes. Configure the degree of parallelism, backpressure policy, queue bounds, and whether output order is preserved.

Installation

bash
dotnet add package NPipeline.Extensions.Parallelism

Quick Start

csharp
var source = builder.AddSource(...);
var transform = builder.AddTransform<MyNode, TIn, TOut>("transform");
var sink = builder.AddSink(...);

builder.Connect(source, transform);
builder.Connect(transform, sink);

builder.WithParallelOptions(transform, new ParallelOptions
{
    MaxDegreeOfParallelism = 8,
    MaxQueueLength = 1000,
    QueuePolicy = BoundedQueuePolicy.Block,
    PreserveOrdering = true
});

Execution Strategies

All strategies are built on lightweight System.Threading.Channels with a fixed pool of worker tasks.

StrategyBehavior
ParallelExecutionStrategy (default)Facade that picks the concrete strategy from the configured queue policy
BlockingParallelStrategyBlocking backpressure - pauses producer when the in-flight window is full; restores input order by default
DropNewestParallelStrategyDiscards incoming items when queue is full
DropOldestParallelStrategyDiscards oldest queued items to make room

Configuration

ParallelOptions

PropertyTypeDefaultDescription
MaxDegreeOfParallelismint?null (processor count)Maximum concurrent workers
MaxQueueLengthint?null (unbounded)Bound on total in-flight items (queued + processing + buffered)
QueuePolicyBoundedQueuePolicyBlockWhat happens when queue is full
OutputBufferCapacityint?nullOutput buffer for end-to-end throttling
PreserveOrderingbooltrueMaintain input order in output via a reorder buffer
MetricsIntervalTimeSpan?null (1 second)Metrics reporting interval
EnableInputWaitTimingboolfalseOpt-in per-item input-wait timing attribution

Workload Type Presets

For common workload patterns, use RunParallel with a preset to get automatically optimized settings:

csharp
builder
    .AddTransform<MyTransform, Input, Output>()
    .RunParallel(builder, ParallelWorkloadType.IoBound);
Workload TypeDOPQueueBufferBest For
General (default)ProcessorCount × 2ProcessorCount × 4ProcessorCount × 8Mixed CPU and I/O
CpuBoundProcessorCountProcessorCount × 2ProcessorCount × 4Math, parsing, compression
IoBoundProcessorCount × 4ProcessorCount × 8ProcessorCount × 16File I/O, database calls
NetworkBoundmin(ProcessorCount × 8, 100)200400HTTP, remote services

Builder API

For fine-grained control beyond presets:

csharp
builder
    .AddTransform<MyTransform, Input, Output>()
    .RunParallel(builder, opt => opt
        .MaxDegreeOfParallelism(8)
        .MaxQueueLength(100)
        .DropOldestOnBackpressure()
        .OutputBufferCapacity(50)
        .AllowUnorderedOutput()
        .MetricsInterval(TimeSpan.FromSeconds(2)));

ParallelOptionsBuilder methods:

MethodDescription
MaxDegreeOfParallelism(int)Set concurrent workers
MaxQueueLength(int)Set input queue capacity
BlockOnBackpressure()Block producer when full (default)
DropOldestOnBackpressure()Discard oldest queued items
DropNewestOnBackpressure()Discard incoming items
OutputBufferCapacity(int)Limit output buffer size
AllowUnorderedOutput()Disable order preservation
EnableInputWaitTiming()Opt in to per-item input-wait timing
MetricsInterval(TimeSpan)Set metrics reporting interval

Comparison: Configuration Methods

ApproachLinesBest For
Preset API: .RunParallel(builder, WorkloadType.IoBound)1Common patterns, prototyping
Builder API: .RunParallel(builder, opt => opt.MaxDegreeOfParallelism(8))2–3Custom needs
Manual: builder.WithParallelOptions(handle, new ParallelOptions { ... })5–6Advanced tuning

Queue Policies

PolicyWhen FullUse Case
BlockProducer waitsMost pipelines - ensures no data loss
DropNewestIncoming items discardedReal-time streams where freshness matters less than throughput
DropOldestOldest queued items discardedReal-time streams where freshness matters most

When item-level lineage is enabled and LineageOptions.EmitBackpressureDropRecords is true, dropped items produce terminal lineage records with OutcomeReason = DroppedByBackpressure for queryable drop visibility.

Order Preservation

AspectPreserveOrdering: true (default)PreserveOrdering: false
ThroughputGoodExcellent
Output orderMatches inputMay be out of order
MemoryHigher (buffering)Lower
LatencyHigher (waits for slow items)Lower (emits immediately)

Disable ordering when downstream processing doesn't depend on input order:

csharp
// Shorthand extension - blocking backpressure, no data loss, completion-order output
transform.WithUnorderedParallelism(builder, maxDegreeOfParallelism: 16, maxQueueLength: 100);

// Or via options
builder.WithParallelOptions(transform, new ParallelOptions
{
    MaxDegreeOfParallelism = 16,
    PreserveOrdering = false
});

Thread Safety

CRITICAL: PipelineContext.Items, Parameters, and Properties dictionaries are NOT thread-safe. Do not access them from parallel worker threads.

The Unsafe Pattern

csharp
// ❌ WRONG - data race across worker threads
public override async Task<int> TransformAsync(int input, PipelineContext context, CancellationToken ct)
{
    var count = context.Items.GetValueOrDefault("processed", 0);
    context.Items["processed"] = count + 1;  // ← DATA RACE
    return input;
}

Thread A reads count = 5, Thread B reads count = 5 (before A writes), both write 6 - one update is lost.

The Safe Pattern

csharp
// ✅ CORRECT - use atomic operations or locks
public class SafeTransform : TransformNode<int, int>
{
    private long _processedCount = 0;

    public override async Task<int> TransformAsync(int input, PipelineContext context, CancellationToken ct)
    {
        Interlocked.Increment(ref _processedCount);
        return input * 2;
    }
}

Three Approaches to Shared State

ApproachWhen to Use
InterlockedSimple counters and flags
lockShort critical sections with multiple operations
IPipelineStateManagerComplex state that needs coordination or persistence
csharp
// Atomic operations for counters
Interlocked.Increment(ref _counter);
Interlocked.Add(ref _sum, input);

// Locks for short critical sections
lock (_syncLock) { _total += input; }

// State manager for coordination
var stateManager = context.StateManager;
stateManager?.MarkNodeCompleted(context.CurrentNodeId, context);

Thread Safety Rules

DO:

  • Process independent data items in parallel (inherently safe)
  • Use Interlocked for atomic counter operations
  • Use lock for simple critical sections - keep them short
  • Use IPipelineStateManager for persistent shared state

DON'T:

  • Access context.Items or context.Parameters from worker threads
  • Share mutable state between nodes without synchronization
  • Hold locks across await calls (causes deadlocks or contention)
  • Create complex multi-step interlocked sequences (use locks instead)

Validation

NPipeline includes a ParallelConfigurationRule that validates parallel settings at build time:

csharp
var result = builder.Validate();

if (result.Warnings.Count > 0)
    foreach (var warning in result.Warnings)
        Console.WriteLine($"⚠️  {warning}");

Validation Rules

RuleTriggerFix
Queue limits with high parallelismDOP > 4 without MaxQueueLengthSet MaxQueueLength to 2–10× DOP
Order preservation overheadPreserveOrdering with DOP > 8Use .AllowUnorderedOutput() if order is not needed
Drop policies without queue boundsDropOldest/DropNewest without MaxQueueLengthSet MaxQueueLength
Thread explosionDOP > ProcessorCount × 4Reduce DOP or verify workload requires it

Quick Fix Examples

csharp
// ⚠️ High parallelism without queue limits
new ParallelOptions { MaxDegreeOfParallelism = 16 }

// ✅ Fix: bound the queue
new ParallelOptions { MaxDegreeOfParallelism = 16, MaxQueueLength = 100 }

// ⚠️ Preserving order with high parallelism
.RunParallel(builder, opt => opt.MaxDegreeOfParallelism(16))

// ✅ Fix: disable ordering for throughput
.RunParallel(builder, opt => opt.MaxDegreeOfParallelism(16).AllowUnorderedOutput())

Metrics

ParallelExecutionMetrics tracks per-node metrics:

MetricDescription
ProcessedTotal items completed
EnqueuedTotal items queued
DroppedNewestItems dropped (DropNewest policy)
DroppedOldestItems dropped (DropOldest policy)
RetryEventsTotal retry attempts
ItemsWithRetryItems that required at least one retry
MaxItemRetryAttemptsHighest retry count for a single item

Metrics are reported to the IExecutionObserver at MetricsInterval. Adjust the interval based on your monitoring needs:

csharp
// Fine-grained monitoring for real-time systems
new ParallelOptions { MetricsInterval = TimeSpan.FromMilliseconds(500) }

// Reduced overhead for batch processing
new ParallelOptions { MetricsInterval = TimeSpan.FromSeconds(10) }

Best Practices

Choosing Degree of Parallelism

  • Start with ProcessorCount for CPU-bound work
  • Use ProcessorCount × 2–4 for I/O-bound work
  • Profile to find the optimal balance - too high causes context switching overhead
  • Start small (DOP = 2) and increase incrementally while monitoring

Bounding Queues

  • Always set MaxQueueLength for DOP > 4 to prevent unbounded memory growth
  • Rule of thumb: MaxQueueLength = 2–10× MaxDegreeOfParallelism
  • Use OutputBufferCapacity to limit how far ahead parallel nodes get

Resource Contention

  • Use connection pooling for databases
  • Implement rate limiting for external APIs
  • Consider batching requests to reduce contention
  • Avoid blocking I/O in worker threads - use async/await

Debugging

  • Include thread IDs in log messages: logger.LogInformation("Item {Id} on thread {ThreadId}", item.Id, Environment.CurrentManagedThreadId)
  • Use structured logging with correlation IDs
  • Monitor queue depths and worker utilization

Performance Optimization Checklist

  • [ ] Profile baseline before adding parallelism
  • [ ] Choose appropriate workload type or DOP
  • [ ] Set queue limits to prevent unbounded growth
  • [ ] Disable order preservation when not needed
  • [ ] Implement thread-safe shared state access
  • [ ] Validate configuration before production deployment
  • [ ] Monitor metrics and tune accordingly

Common Pitfalls

PitfallConsequenceFix
Over-parallelizationContext switching, thread starvationProfile and find optimal DOP
Unsynchronized shared stateData races, silent corruptionUse Interlocked, lock, or IPipelineStateManager
Unbounded queuesOut-of-memory under loadSet MaxQueueLength
Unnecessary orderingHigher latency and memorySet PreserveOrdering = false
Blocking calls in workersThread pool starvationUse async/await

Example: Multi-Stage Pipeline

csharp
public class FileProcessingPipeline : IPipelineDefinition
{
    public void Define(PipelineBuilder builder, PipelineContext context)
    {
        var source = builder.AddSource(new CsvSourceNode<RawRecord>(uri), "source");

        // File I/O stage - I/O-bound
        var read = builder.AddTransform<FileReaderNode, RawRecord, FileContent>("read");
        read.RunParallel(builder, ParallelWorkloadType.IoBound);

        // Parse stage - CPU-bound
        var parse = builder.AddTransform<ParserNode, FileContent, ParsedData>("parse");
        parse.RunParallel(builder, ParallelWorkloadType.CpuBound);

        // Upload stage - network-bound
        var upload = builder.AddTransform<UploaderNode, ParsedData, UploadResult>("upload");
        upload.RunParallel(builder, ParallelWorkloadType.NetworkBound);

        var sink = builder.AddSink(new DatabaseSinkNode<UploadResult>(config), "sink");

        builder.Connect(source, read);
        builder.Connect(read, parse);
        builder.Connect(parse, upload);
        builder.Connect(upload, sink);
    }
}

See Also

Released under the MIT License.