Skip to content

SQL Server Connector

The NPipeline.Connectors.SqlServer package provides source and sink nodes for SQL Server. Supports connection pooling, parameterized queries, batch inserts, high-performance SqlBulkCopy, and MERGE upserts.

Installation

bash
dotnet add package NPipeline.Connectors.SqlServer

Dependencies: Microsoft.Data.SqlClient 7.x

Source Node - SqlServerSourceNode<T>

Constructors

csharp
// Connection string + query
public SqlServerSourceNode(
    string connectionString, string query,
    SqlServerConfiguration? configuration = null)

// With custom mapper
public SqlServerSourceNode(
    string connectionString, string query,
    Func<SqlServerRow, T>? customMapper = null,
    SqlServerConfiguration? configuration = null)

// Connection pool (recommended for DI)
public SqlServerSourceNode(
    ISqlServerConnectionPool connectionPool, string query,
    SqlServerConfiguration? configuration = null,
    DatabaseParameter[]? parameters = null,
    bool continueOnError = false,
    string? connectionName = null)

Example

csharp
var source = new SqlServerSourceNode<Order>(
    "Server=localhost;Database=Sales;Trusted_Connection=true;",
    "SELECT Id, Customer, Amount FROM dbo.Orders WHERE Status = @status",
    configuration: new SqlServerConfiguration
    {
        StreamResults = true,
        FetchSize = 5000
    });

Sink Node - SqlServerSinkNode<T>

StrategyDescriptionBest For
PerRowIndividual INSERT per itemSmall volumes
Batch (default)Batched INSERT statementsMost workloads
BulkCopySqlBulkCopyMaximum throughput

Constructors

csharp
// Connection string
public SqlServerSinkNode(
    string connectionString, string tableName,
    SqlServerConfiguration? configuration = null,
    Func<T, IEnumerable<DatabaseParameter>>? customMapper = null)

// Connection pool (recommended for DI)
public SqlServerSinkNode(
    ISqlServerConnectionPool connectionPool, string tableName,
    SqlServerConfiguration? configuration = null,
    Func<T, IEnumerable<DatabaseParameter>>? customMapper = null,
    string? connectionName = null)

Example: Bulk Copy

csharp
var config = new SqlServerConfiguration
{
    ConnectionString = "Server=localhost;Database=Sales;...",
    WriteStrategy = SqlServerWriteStrategy.BulkCopy,
    BulkCopyBatchSize = 5000,
    EnableStreaming = true
};

var sink = new SqlServerSinkNode<Order>("connection-string", "dbo.Orders", configuration: config);

Configuration

Connection

PropertyTypeDefaultDescription
ConnectionStringstring""SQL Server connection string
Schemastring"dbo"Default schema
CommandTimeoutint30Command timeout (seconds)
ConnectionTimeoutint15Connection timeout (seconds)
MinPoolSizeint1Minimum connection pool size
MaxPoolSizeint100Maximum connection pool size

Write

PropertyTypeDefaultDescription
WriteStrategySqlServerWriteStrategyBatchPerRow, Batch, or BulkCopy
BatchSizeint100Items per batch
MaxBatchSizeint1000Maximum batch size
UseTransactionbooltrueWrap writes in a transaction
UsePreparedStatementsbooltrueUse prepared statements

Bulk Copy

PropertyTypeDefaultDescription
BulkCopyBatchSizeint5000Rows per bulk copy batch
BulkCopyTimeoutint300Bulk copy timeout (seconds)
BulkCopyNotifyAfterint1000Progress notification interval (rows)
EnableStreamingbooltrueStream bulk copy data

Upsert (MERGE)

PropertyTypeDefaultDescription
UseUpsertboolfalseEnable MERGE upserts
UpsertKeyColumnsstring[]?nullKey columns for MERGE matching
OnMergeActionOnMergeActionUpdateUpdate, Ignore, or Delete

Error Handling

PropertyTypeDefaultDescription
ContinueOnErrorboolfalseContinue on row-level errors
MaxRetryAttemptsint3Retry attempts for transient errors
RetryDelayTimeSpan-Delay between retries

Dependency Injection

csharp
services.AddSqlServerConnector(options =>
{
    options.DefaultConnectionString = "Server=localhost;Database=Sales;...";
    options.DefaultConfiguration = new SqlServerConfiguration
    {
        WriteStrategy = SqlServerWriteStrategy.BulkCopy,
        BulkCopyBatchSize = 5000
    };
});

// Named connections for multi-database scenarios
services.AddSqlServerConnection("reporting", "Server=reporting-db;...");
services.AddSqlServerConnection("warehouse", "Server=warehouse-db;...");

Registers ISqlServerConnectionPool, SqlServerSourceNodeFactory, and SqlServerSinkNodeFactory.

Attribute Mapping

Convention-Based

C# PascalCase property names map directly to SQL Server PascalCase column names (no conversion).

[Column] / [IgnoreColumn] (Cross-Connector)

csharp
using NPipeline.Connectors.Attributes;

public class Customer
{
    [Column("CustomerID")]
    public int CustomerId { get; set; }

    [IgnoreColumn]
    public string FullName => $"{FirstName} {LastName}";
}

[SqlServerColumn] (Connector-Specific)

Extends [Column] with SQL Server features:

csharp
using NPipeline.Connectors.SqlServer.Mapping;

public class Customer
{
    [SqlServerColumn("CustomerID", PrimaryKey = true, Identity = true)]
    public int CustomerId { get; set; }

    [SqlServerColumn("FirstName", DbType = SqlDbType.NVarChar, Size = 100)]
    public string FirstName { get; set; } = "";

    [SqlServerColumn("Email", DbType = SqlDbType.NVarChar, Size = 255)]
    public string Email { get; set; } = "";
}
PropertyDescription
NameColumn name in the database
DbTypeSQL Server data type (SqlDbType)
SizeSize/length for character and numeric types
PrimaryKeyPrimary key (used for checkpointing)
IdentityAuto-increment identity column
IgnoreSkip mapping this property

Use common attributes for portable code; use [SqlServerColumn] when you need type, PK, or identity control.

Delivery Semantics

SemanticData LossDuplicatesOverheadUse Case
AtLeastOnce (default)NoPossibleLowIdempotent operations
AtMostOncePossibleNoLowTelemetry, metrics
ExactlyOnceNoNoHighFinancial transactions
csharp
var config = new SqlServerConfiguration
{
    DeliverySemantic = DeliverySemantic.ExactlyOnce,
    UseTransaction = true,
    CheckpointStrategy = CheckpointStrategy.Offset,
    CheckpointStorage = new FileCheckpointStorage("checkpoints.json")
};

Checkpointing

Checkpointing enables pipelines to resume from where they left off after a failure.

StrategyPersistenceDescription
None (default)-No checkpointing; restart from beginning on failure
InMemoryProcess lifetimeRecover from transient failures within a single run
OffsetExternal storageTrack position via monotonically increasing column
KeyBasedExternal storageTrack processed items by composite keys
CursorExternal storageTrack cursor position for iteration
CDCExternal storageTrack LSN for SQL Server Change Data Capture

Offset Example

csharp
var config = new SqlServerConfiguration
{
    CheckpointStrategy = CheckpointStrategy.Offset,
    CheckpointOffsetColumn = "OrderId",
    CheckpointStorage = new FileCheckpointStorage("checkpoints/orders.json")
};

var source = new SqlServerSourceNode<Order>(connectionString,
    "SELECT * FROM Orders WHERE OrderId > @lastCheckpoint ORDER BY OrderId",
    configuration: config);

CDC Example

csharp
var config = new SqlServerConfiguration
{
    CheckpointStrategy = CheckpointStrategy.CDC,
    CdcCaptureInstance = "dbo_orders",
    CheckpointStorage = new FileCheckpointStorage("checkpoints/cdc.json")
};

Requires CDC enabled on the database and table:

sql
EXEC sys.sp_cdc_enable_db;
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'orders', @role_name = NULL;

Checkpoint Intervals

csharp
config.CheckpointInterval = new CheckpointIntervalConfiguration
{
    RowCountInterval = 10_000,
    TimeInterval = TimeSpan.FromMinutes(5)
};

Mapping

PropertyDefaultDescription
CaseInsensitiveMappingtrueMatch OrderId, orderid, ORDERID to same property
CacheMappingMetadatatrueCache mapping delegates per type (avoid repeated reflection)
ValidateIdentifierstrueValidate SQL identifiers to prevent injection

Performance

Streaming

csharp
var config = new SqlServerConfiguration { StreamResults = true, FetchSize = 1_000 };

Without streaming, the entire result set is loaded into memory. With streaming, rows are fetched in batches of FetchSize.

FetchSizeBest For
100–500Memory-constrained, wide rows
1,000–5,000Most workloads
5,000–10,000Maximum throughput, high-bandwidth

Write Strategy Comparison

StrategyThroughputLatencyError IsolationUse Case
PerRowLowLowHighReal-time, per-row errors
BatchHighMediumMediumETL, balanced
BulkCopyVery HighHighLowBulk loads, data warehouse

Batch Size Guidelines

RangeBest For
100–500Real-time processing, low latency
500–1,000Balanced throughput and latency
1,000–5,000Bulk loading

Note: Effective batch size is capped by SQL Server's 2,100 parameter limit divided by the number of mapped columns.

Prepared Statements

Enabled by default (UsePreparedStatements = true). Reduces query parsing overhead by 10–30% for repeated inserts.

Row-Level Error Handling

csharp
var config = new SqlServerConfiguration
{
    RowErrorHandler = (exception, row) =>
    {
        Console.WriteLine($"Error on row {row?.Get<int>("OrderId")}: {exception.Message}");
        return exception is FormatException; // true = skip row, false = re-throw
    }
};

Best Practices

  1. Use DI with AddSqlServerConnector for production - centralizes connection management
  2. Enable streaming (StreamResults = true) for large result sets
  3. Use BulkCopy for bulk loading - significantly faster than Batch
  4. Enable upsert for idempotent writes to avoid duplicate handling
  5. Validate identifiers - never disable ValidateIdentifiers in production
  6. Use prepared statements for repeated query patterns
  7. Configure checkpointing for long-running pipelines
  8. Tune batch size based on latency/throughput requirements
  9. Set ApplicationName for monitoring in SQL Server Activity Monitor

Next Steps

Released under the MIT License.