Skip to content

Data Lineage

Prerequisites: Defining Pipelines, Dependency Injection

The NPipeline.Extensions.Lineage package tracks how data flows through your pipeline - which node produced each item, what transforms it passed through, and where it ended up.

Installation

bash
dotnet add package NPipeline.Extensions.Lineage

Setup

csharp
services.AddNPipeline(Assembly.GetExecutingAssembly());
services.AddNPipelineLineage(); // logs lineage reports via ILogger

Enable item-level lineage in the pipeline definition:

csharp
public void Define(PipelineBuilder builder, PipelineContext context)
{
    builder.EnableItemLevelLineage();
    // ... add nodes and connections
}

What Gets Tracked

Pipeline-Level Lineage

After each run, a PipelineLineageReport is generated containing:

  • Pipeline name, ID, and run ID
  • All nodes with type information (NodeLineageInfo: ID, type name, input/output types)
  • All edges between nodes (EdgeLineageInfo: source → target)
  • The complete DAG structure

Item-Level Lineage

When enabled with EnableItemLevelLineage(), each item gets a unique correlation ID:

  • Source record - which node produced the item
  • Transform records - each transformation the item passed through
  • Sink/terminal record - where the item was consumed
  • Error records - if the item failed processing

Configuration

LineageOptions controls all lineage behavior. Two presets are available:

csharp
// Fast: throughput-oriented, reduced detail (default)
builder.EnableItemLevelLineage(LineageOptions.FastLineage);

// Complete: full detail for compliance and debugging
builder.EnableItemLevelLineage(LineageOptions.CompleteLineage);

Or customize individual settings:

csharp
builder.EnableItemLevelLineage(options => options with
{
    SampleEvery = 100,
    DeterministicSampling = true,
    RedactData = true,
    CaptureHopSnapshots = false,
    OverflowPolicy = LineageOverflowPolicy.Degrade,
    MaterializationCap = 10000
});

Sampling

SettingDefaultDescription
SampleEvery100Track lineage for every Nth item (1 = all items)
DeterministicSamplingtrueHash-based (reproducible) vs random (representative)

Sampling rate guidelines:

ScenarioRecommended Rate
Compliance / auditSampleEvery = 1 (100%)
Production monitoringSampleEvery = 10–100 (1–10%)
Development / debuggingSampleEvery = 2–10 (10–50%)
High-volume analyticsSampleEvery = 100–1000 (0.1–1%)

Data Capture Options

SettingDefaultDescription
CaptureHopTimestampstrueRecord timestamps at each node
CaptureDecisionstrueRecord resilience decisions (skip, retry, dead-letter)
CaptureObservedCardinalitytrueRecord input/output counts per node
CaptureAncestryMappingfalseTrack full ancestry chain
CaptureHopSnapshotsfalseSerialize item before/after each node (expensive)
MaxHopRecordsPerItem256Maximum lineage records per correlation ID
RedactDatatrueSet lineageRecord.Data to null (useful for PII)

⚠️ Warning: CaptureHopSnapshots serializes items at every node hop. Use with SampleEvery ≥ 100 to limit overhead.

Overflow Policies

When the MaterializationCap is reached:

PolicyBehaviorUse When
Degrade (default)Switches to streaming positional mappingProduction - graceful degradation
StrictThrows immediatelyMemory limits are critical
WarnContinueLogs a warning and continuesDevelopment / debugging

Emission Options

SettingDefaultDescription
EnsurePerInputTerminalRecordtrueGuarantee a terminal record per input
EmitBackpressureDropRecordstrueEmit records when items are dropped by backpressure
IncludeContributorCorrelationIdstrueInclude contributor IDs in join/aggregate records
EmitIntermediateNodeRecordstrueEmit records for intermediate (non-terminal) nodes

Custom Lineage Sinks

Replace the default logging sink:

csharp
services.AddNPipelineLineage<DatabaseLineageSink>();

Implement IPipelineLineageSink to store lineage reports:

csharp
public class DatabaseLineageSink : IPipelineLineageSink
{
    public Task WriteAsync(PipelineLineageReport report, CancellationToken ct)
    {
        // Store report in your lineage database
    }
}

Querying Lineage

Access the LineageCollector to query lineage data during or after execution:

csharp
var collector = context.LineageCollector;
var history = collector.GetCorrelationHistory(correlationId);
var unresolved = collector.GetUnresolvedCorrelations();
var allRecords = collector.GetAllRecords();

Performance Tuning

Lineage adds per-item overhead proportional to the options enabled. To minimize impact:

  1. Increase SampleEvery - every 100th item is usually sufficient for monitoring
  2. Disable CaptureHopSnapshots - serialization is the most expensive operation
  3. Set RedactData = true - avoids storing large payloads
  4. Use LineageOptions.FastLineage as a starting point
  5. Set a MaterializationCap to bound memory usage

Next Steps

Released under the MIT License.