Skip to content

Lineage

The NPipeline.Extensions.Lineage package provides data lineage tracking - recording the path each item takes through a pipeline, including transformations, filtering, dead-lettering, and errors. This is critical for compliance (GDPR, HIPAA), debugging, and data quality auditing.

Installation

bash
dotnet add package NPipeline.Extensions.Lineage

Quick Start

csharp
services.AddNPipeline(builder => { ... });
services.AddNPipelineLineage<LoggingPipelineLineageSink>();

Items flowing through the pipeline are automatically wrapped in LineagePacket<T> with a correlation ID. At each node, the lineage system records the traversal.

Architecture

Source → wraps items in LineagePacket<T> (assigns CorrelationId)
       → each node appends LineageRecord events
       → terminal event per sampled correlation
       → ILineageCollector (thread-safe ConcurrentDictionary)
       → ILineageSink / IPipelineLineageSink (export)

Core Contracts

TypeRole
ILineageBuild-time adapter creation, stream wrapping/unwrapping, pipeline report recording
LineageRecordSingle traversal event (node ID, outcome, correlation ID, optional snapshots)
ILineageCollectorThread-safe storage with per-correlation event tracking
ILineageSinkItem-level export (called per record)
IPipelineLineageSinkPipeline-level export (called once at end of run)
LineagePacket<T>Wraps items with Data, CorrelationId, TraversalPath, LineageRecords, Collect

Runtime stream contract: When item-level lineage is enabled, streams throughout execution carry LineagePacket<T> items, not T items directly. The RuntimePipelineBinder normalizes execution options (such as route predicates) to operate on LineagePacket<T> before execution starts. Sinks receive an IDataStream<LineagePacket<T>> input; the lineage adapter unwraps each packet to expose the inner T to your ISinkNode<T>.ConsumeAsync implementation. This unwrapping is strictly typed - the input stream must be IDataStream<LineagePacket<T>> or the build fails.

LineageRecord Fields

FieldDescription
CorrelationIdTracks the item across all nodes
NodeIdNode that produced this record
PipelineId / PipelineNamePipeline context
OutcomeReasonTerminal or intermediate outcome
IsTerminalWhether this is the final record for this correlation
TraversalPathOrdered list of node IDs visited
ContributorCorrelationIdsFor joins/aggregates - IDs of contributing items
InputSnapshot / OutputSnapshotJSON snapshots (when hop snapshots enabled)
DataItem payload (subject to redaction)

Terminal Outcomes

LineageOutcomeReasonDescription
EmittedItem successfully reached a sink
FilteredOutItem was removed by a filter node
ConsumedWithoutEmissionItem consumed by an aggregate or reduction
ErrorItem processing failed
DeadLetteredItem was sent to a dead letter sink
DroppedByBackpressureItem dropped by a parallel queue policy
JoinedItem was part of a join operation
AggregatedItem was part of an aggregation

Configuration

LineageOptions Presets

Two built-in presets cover most use cases:

PresetSamplingIntermediate RecordsSnapshotsBest For
LineageOptions.FastLineageLowDisabledDisabledHigh-volume production
LineageOptions.CompleteLineage100%EnabledEnabledDebugging, compliance

Customize with the .With() method:

csharp
var options = LineageOptions.FastLineage.With(sampleEvery: 100, redactData: true);

Sampling

Deterministic sampling - hash-based, consistent across runs. Items with hash(correlationId) % SampleEvery == 0 are tracked.

Random sampling - probabilistic, different items tracked each run. Probability is 1/SampleEvery.

ScenarioRateSampleEvery
Compliance / audit trails100%1
Production monitoring1–10%10–100
Development / debugging10–50%2–10
High-volume analytics0.1–1%100–1000

Data Redaction

When redactData: true, the LineageRecord.Data field is set to null. Traversal context, outcomes, and correlation metadata are preserved. Use for pipelines handling PII or sensitive data.

Hop Snapshots

When captureHopSnapshots: true, each node records JSON snapshots of the item before and after transformation. Handle circular references silently. Performance impact is high - enable at conservative sampling rates (≥ 100).

Overflow Policies

Controls behavior when the materialization cap is exceeded:

PolicyBehaviorUse Case
Degrade (default)Switches to streaming positional mappingProduction - memory-safe
StrictThrows immediatelyDevelopment - fail-fast
WarnContinueLogs warning, continuesTesting - see all events
csharp
var options = new LineageOptions(
    sampleEvery: 100,
    deterministicSampling: true,
    redactData: true,
    materializationCap: 10_000,
    overflowPolicy: LineageOverflowPolicy.Degrade);

Completeness Guarantees

OptionDescription
EnsurePerInputTerminalRecordEvery sampled correlation gets a terminal closure
EmitIntermediateNodeRecordsEmit non-terminal records at each node
EmitBackpressureDropRecordsEmit terminal records for items dropped by queue policies

Querying Lineage

csharp
// Get full history for a correlation
var history = collector.GetCorrelationHistory(correlationId);

// Get terminal outcome
var terminal = collector.GetTerminalReason(correlationId);

// Find unresolved correlations (items that entered but never got a terminal event)
var unresolved = collector.GetUnresolvedCorrelations();

// All records (for export)
var allRecords = collector.GetAllRecords();

Pipeline Lineage Sinks

Built-in

SinkDescription
LoggingPipelineLineageSinkLogs the lineage report via ILogger

Custom

Implement ILineageSink (per-record) or IPipelineLineageSink (per-run):

csharp
public class DatabaseLineageSink : IPipelineLineageSink
{
    public async Task RecordAsync(PipelineLineageReport report, CancellationToken ct)
    {
        // Persist report to database
    }
}

Dependency Injection

csharp
// With logging sink
services.AddNPipelineLineage<LoggingPipelineLineageSink>();

// With custom sink
services.AddNPipelineLineage<DatabaseLineageSink>();

// With factory
services.AddNPipelineLineage(sp =>
    new DatabaseLineageSink(sp.GetRequiredService<IDbConnection>()));

// With custom collector and sink
services.AddNPipelineLineage<CustomCollector, DatabaseLineageSink>();

Registered Services

ServiceImplementationLifetime
ILineageLineageServiceScoped
ILineageCollectorLineageCollectorScoped
IPipelineLineageSinkUser-specifiedScoped
ILineageFactoryDiLineageFactoryScoped
IPipelineLineageSinkProviderDefaultPipelineLineageSinkProviderScoped

Configuration Examples

Production - low overhead, privacy-safe:

csharp
var options = new LineageOptions(
    sampleEvery: 100,
    deterministicSampling: true,
    redactData: true,
    materializationCap: 10_000,
    overflowPolicy: LineageOverflowPolicy.Degrade,
    emitBackpressureDropRecords: true);

Development - full visibility:

csharp
var options = new LineageOptions(
    sampleEvery: 1,
    deterministicSampling: true,
    redactData: false,
    emitIntermediateNodeRecords: true,
    ensurePerInputTerminalRecord: true,
    captureHopSnapshots: true,
    overflowPolicy: LineageOverflowPolicy.WarnContinue);

Performance

Option Impact

OptionThroughput ImpactMemory Impact
Lower SampleEveryHighHigh
RedactData = falseMediumHigh
CaptureHopSnapshots = trueHighHigh
EmitIntermediateNodeRecords = trueMediumMedium
IncludeContributorCorrelationIds = trueLow–MediumMedium
EnsurePerInputTerminalRecord = trueLowLow

Best Practices

  1. Start with FastLineage in production, customize with .With()
  2. Use deterministic sampling for debugging - same items tracked across runs
  3. Enable redaction for PII/sensitive data
  4. Use Degrade overflow policy in production - memory-safe
  5. Implement async sinks - avoid blocking I/O in sink implementations
  6. Keep EmitBackpressureDropRecords = true - queryable drop visibility

Use Cases

Use CaseKey Options
Compliance audit trails100% sampling, deterministic, not redacted, ensure terminal records
Correlation debuggingQuery GetCorrelationHistory(id) to trace item path
Backpressure visibilityEmitBackpressureDropRecords = true for drop dashboards
Join/aggregate provenanceIncludeContributorCorrelationIds = true
Unresolved correlation monitoringGetUnresolvedCorrelations() for completeness gaps
Privacy-conscious monitoringRedactData = true, 1–10% sampling
Node health analysisGroup records by node ID, count errors/dead-letters per node

See Also

Released under the MIT License.