Skip to content

Lineage Configuration

This guide covers all configuration options for the NPipeline Lineage extension, including sampling strategies, data redaction, overflow policies, and custom sink registration.

LineageOptions

The LineageOptions class controls item-level lineage tracking behavior:

csharp
public sealed record LineageOptions
{
    /// Throw on cardinality mismatch instead of logging (default: false)
    bool Strict { get; init; } = false;
    
    /// Log warning on mismatch when Strict=false (default: true)
    bool WarnOnMismatch { get; init; } = true;
    
    /// Optional callback invoked on mismatch (default: null)
    Action<LineageMismatchContext>? OnMismatch { get; init; } = null;
    
    /// Maximum items to materialize for lineage mapping (default: null = unbounded)
    int? MaterializationCap { get; init; } = null;
    
    /// Behavior when materialization cap is exceeded (default: Degrade)
    LineageOverflowPolicy OverflowPolicy { get; init; } = LineageOverflowPolicy.Degrade;
    
    /// Capture per-hop enter/exit timestamps (default: true)
    bool CaptureHopTimestamps { get; init; } = true;
    
    /// Capture decision outcomes like Emitted, FilteredOut (default: true)
    bool CaptureDecisions { get; init; } = true;
    
    /// Capture observed cardinality and counts (default: true)
    bool CaptureObservedCardinality { get; init; } = true;
    
    /// Capture ancestry mapping when mapper is declared (default: false)
    bool CaptureAncestryMapping { get; init; } = false;
    
    /// Capture per-hop input/output snapshots for Studio lineage diff visualization (default: false)
    bool CaptureHopSnapshots { get; init; } = false;
    
    /// Sample every Nth item - 1 means all items (default: 100)
    int SampleEvery { get; init; } = 100;
    
    /// Use deterministic (hash-based) sampling (default: true)
    bool DeterministicSampling { get; init; } = true;
    
    /// Omit payload Data in LineageInfo records (default: true)
    bool RedactData { get; init; } = true;
    
    /// Maximum hop records per item before truncation (default: 256)
    int MaxHopRecordsPerItem { get; init; } = 256;
}

Sampling Configuration

Deterministic Sampling

Deterministic sampling uses a hash-based approach to select items consistently across runs:

csharp
builder.EnableItemLevelLineage(options =>
{
    options.SampleEvery = 100;  // Sample 1% of items
    options.DeterministicSampling = true;
});

When to Use:

  • Debugging specific issues (same items sampled across runs)
  • Compliance scenarios requiring consistent tracking
  • Reproducible testing

How It Works:

  • Computes hash of item's correlation ID
  • Items with hash % SampleEvery == 0 are sampled
  • Same items always selected across multiple runs
  • Provides predictable, repeatable behavior

Random Sampling

Random sampling selects items at the specified rate without consistency across runs:

csharp
builder.EnableItemLevelLineage(options =>
{
    options.SampleEvery = 100;  // Sample 1% of items
    options.DeterministicSampling = false;
});

When to Use:

  • Monitoring and analytics (representative samples)
  • High-volume pipelines where consistency isn't required
  • Reducing overhead with minimal bias

How It Works:

  • Uses random number generator for each item
  • Items selected with probability 1/SampleEvery
  • Different items may be sampled across runs
  • Provides statistically representative samples

Sampling Rate Guidelines

ScenarioRecommended RateReasoning
Production compliance100% (SampleEvery = 1)Complete audit trails required
Production monitoring1-10% (SampleEvery = 10-100)Balance visibility and overhead
Development/debugging10-50% (SampleEvery = 2-10)Good visibility with manageable overhead
High-volume analytics0.1-1% (SampleEvery = 100-1000)Representative samples, minimal overhead

Data Redaction

Redaction excludes actual data from lineage records, storing only metadata:

csharp
builder.EnableItemLevelLineage(options =>
{
    options.RedactData = true;
});

When to Use:

  • Sensitive data (PII, financial data, health records)
  • Large data objects (reduces memory usage)
  • Focus on flow patterns rather than data values

Impact:

  • LineageInfo.Data field will be null
  • All other lineage metadata preserved
  • Reduces memory usage by not storing actual data
  • No impact on tracking accuracy

Example:

csharp
// Without redaction
var lineageInfo = collector.GetLineageInfo(correlationId);
Console.WriteLine(lineageInfo.Data);  // Outputs actual data

// With redaction
var lineageInfo = collector.GetLineageInfo(correlationId);
Console.WriteLine(lineageInfo.Data);  // Outputs: null
Console.WriteLine(lineageInfo.CorrelationId);  // Still available
Console.WriteLine(lineageInfo.TraversalPath);  // Still available

Hop Snapshots

When CaptureHopSnapshots is enabled each LineageHop record includes a before/after snapshot of the item at that node, serialized as a JsonElement. This powers NPipeline Studio's lineage diff view.

csharp
builder.EnableItemLevelLineage(options =>
{
    options.CaptureHopSnapshots = true;  // Default: false
});

Snapshots are produced by JSON-serializing the raw item value. Circular references in the object graph are silently omitted rather than causing a serialization failure. If serialization fails entirely (e.g., the type is not serializable), the snapshot falls back to value.ToString().

When to Enable:

  • Debugging: See exactly how each node transforms each item
  • NPipeline Studio lineage inspector and diff view
  • Compliance workflows requiring full value-level audit trails

Performance Impact:

Enabling hop snapshots adds a JSON serialize+deserialize round-trip per hop per sampled item. Keep SampleEvery at a conservative rate (≥ 100) when using this option in high-throughput pipelines.

Overflow Policies

The cap limits memory usage by materializing only a subset of items:

csharp
builder.EnableItemLevelLineage(options =>
{
    options.MaterializationCap = 10000;  // Default is null (unbounded)
});

Adjust Based On:

  • Available memory
  • Pipeline throughput
  • Required visibility
  • Sampling rate

Overflow Policy Options

Degrade (Default)

Switches to streaming positional mapping when cap is exceeded:

csharp
options.OverflowPolicy = LineageOverflowPolicy.Degrade;  // Default

Characteristics:

  • Continues processing when cap exceeded
  • Switches to streaming positional mapping
  • Best balance of visibility and memory safety

When to Use:

  • Production pipelines with sampling
  • Most production use cases
  • When you want graceful degradation

Strict

Throws immediately when cap is exceeded:

csharp
options.OverflowPolicy = LineageOverflowPolicy.Strict;

Characteristics:

  • Throws exception when cap exceeded
  • Strict enforcement of memory limits
  • Fail-fast behavior

When to Use:

  • When memory limits must be strictly enforced
  • Testing and validation scenarios
  • When you need to know if cap is reached

WarnContinue

Emits warnings when cap is exceeded and continues:

csharp
options.OverflowPolicy = LineageOverflowPolicy.WarnContinue;

Characteristics:

  • Logs warnings when cap exceeded
  • May continue materializing (risk of memory growth)
  • Provides visibility into overflow situations

When to Use:

  • Development and debugging
  • When you want visibility without failing
  • Monitoring memory behavior

Choosing an Overflow Policy

ScenarioPolicyReasoning
Production pipelinesDegradeSafe default, graceful degradation
Memory-constrainedStrictEnforce limits strictly
Development/debuggingWarnContinueVisibility without failing
Compliance scenariosDegradeEnsures continued processing

Custom Sink Registration

Register custom lineage sinks to export data to external systems:

Built-in Logging Sink

csharp
builder.UseLoggingPipelineLineageSink();

Custom ILineageSink

Implement for item-level lineage export:

csharp
public sealed class DatabaseLineageSink : ILineageSink
{
    private readonly IDbConnection _connection;

    public DatabaseLineageSink(IDbConnection connection)
    {
        _connection = connection;
    }

    public async Task RecordAsync(LineageInfo lineageInfo, CancellationToken cancellationToken)
    {
        const string sql = @"
            INSERT INTO Lineage (CorrelationId, Data, TraversalPath, Timestamp)
            VALUES (@CorrelationId, @Data, @TraversalPath, @Timestamp)";
        
        await _connection.ExecuteAsync(sql, new
        {
            CorrelationId = lineageInfo.CorrelationId,
            Data = lineageInfo.Data?.ToString(),
            TraversalPath = string.Join(",", lineageInfo.TraversalPath),
            Timestamp = DateTime.UtcNow
        }, cancellationToken);
    }
}

Custom IPipelineLineageSink

Implement for pipeline-level lineage export:

csharp
public sealed class JsonFileLineageSink : IPipelineLineageSink
{
    private readonly string _filePath;

    public JsonFileLineageSink(string filePath)
    {
        _filePath = filePath;
    }

    public async Task RecordAsync(PipelineLineageReport report, CancellationToken cancellationToken)
    {
        var json = JsonSerializer.Serialize(report, new JsonSerializerOptions
        {
            WriteIndented = true
        });
        
        await File.WriteAllTextAsync(_filePath, json, cancellationToken);
    }
}

Register Custom Sinks via DI

csharp
// Register with custom sink type
services.AddNPipelineLineage<DatabaseLineageSink>();

// Register with factory delegate
services.AddNPipelineLineage(sp =>
{
    var connectionString = sp.GetRequiredService<IConfiguration>()["ConnectionStrings:Lineage"];
    var connection = new SqlConnection(connectionString);
    return new DatabaseLineageSink(connection);
});

// Register both item and pipeline sinks
services.AddNPipelineLineage<DatabaseLineageSink, JsonFileLineageSink>();

Dependency Injection Configuration

Default Registration

csharp
services.AddNPipelineLineage();

Registers:

  • ILineageCollector (scoped)
  • ILineageSinkLoggingPipelineLineageSink (transient)
  • IPipelineLineageSinkLoggingPipelineLineageSink (transient)

Custom Sink Registration

csharp
services.AddNPipelineLineage<CustomLineageSink>();

Factory-Based Registration

csharp
services.AddNPipelineLineage(sp =>
{
    var logger = sp.GetRequiredService<ILogger<CustomLineageSink>>();
    var config = sp.GetRequiredService<IConfiguration>();
    return new CustomLineageSink(logger, config);
});

Custom Collector Registration

csharp
services.AddNPipelineLineage<CustomCollector, LoggingPipelineLineageSink, LoggingPipelineLineageSink>();

Configuration Examples

Production-Ready Configuration

csharp
services.AddNPipelineLineage<DatabaseLineageSink>();

// In pipeline
builder.EnableItemLevelLineage(options =>
{
    options.SampleEvery = 100;  // 1% sampling (default)
    options.DeterministicSampling = true;  // Default
    options.RedactData = true;  // Don't store sensitive data (default)
    options.MaterializationCap = 10000;
    options.OverflowPolicy = LineageOverflowPolicy.Degrade;  // Default
});

Development Configuration

csharp
services.AddNPipelineLineage();  // Use logging sink

// In pipeline
builder.EnableItemLevelLineage(options =>
{
    options.SampleEvery = 1;  // Track everything
    options.DeterministicSampling = true;
    options.RedactData = false;  // Keep data for debugging
    options.MaterializationCap = null;  // No cap (unbounded)
    options.OverflowPolicy = LineageOverflowPolicy.WarnContinue;
});

High-Volume Monitoring Configuration

csharp
services.AddNPipelineLineage<PrometheusLineageSink>();

// In pipeline
builder.EnableItemLevelLineage(options =>
{
    options.SampleEvery = 1000;  // 0.1% sampling
    options.DeterministicSampling = false;  // Random sampling
    options.RedactData = true;  // Minimal memory (default)
    options.MaterializationCap = 1000;  // Small cap
    options.OverflowPolicy = LineageOverflowPolicy.Degrade;  // Graceful degradation
});

Best Practices

1. Start with Conservative Sampling

Begin with 1-10% sampling in production:

csharp
options.SampleEvery = 100;  // Conservative start

Adjust based on requirements and performance impact.

2. Use Deterministic Sampling for Debugging

When investigating specific issues:

csharp
options.DeterministicSampling = true;
options.SampleEvery = 1;  // Track all items temporarily

3. Enable Redaction for Sensitive Data

Always redact PII, financial data, or health records:

csharp
options.RedactData = true;

4. Use Degrade Policy in Production

Default policy provides best balance:

csharp
options.OverflowPolicy = LineageOverflowPolicy.Degrade;

5. Implement Async Sinks

Use async operations in custom sinks:

csharp
public async Task RecordAsync(LineageInfo lineageInfo, CancellationToken cancellationToken)
{
    await _database.SaveChangesAsync(cancellationToken);
}

Released under the MIT License.