Skip to content

PostgreSQL Connector

The NPipeline.Connectors.Postgres package provides source and sink nodes for PostgreSQL. Supports connection pooling, parameterized queries, batch inserts, high-performance COPY operations, and upserts with ON CONFLICT.

Installation

bash
dotnet add package NPipeline.Connectors.Postgres

Dependencies: Npgsql 10.x

Source Node - PostgresSourceNode<T>

Reads rows from a SQL query and emits each as an item of type T.

Constructors

csharp
// Connection string + SQL query
public PostgresSourceNode(
    string sql,
    PostgresConfiguration configuration,
    Func<PostgresRow, T>? rowMapper = null)

// NpgsqlDataSource (recommended for DI)
public PostgresSourceNode(
    string sql,
    NpgsqlDataSource dataSource,
    Func<PostgresRow, T>? rowMapper = null)

// StorageUri-based (multi-tenant scenarios)
public PostgresSourceNode(
    StorageUri uri, string query,
    IStorageResolver? resolver = null,
    Func<PostgresRow, T>? rowMapper = null,
    PostgresConfiguration? configuration = null)

Example

csharp
var config = new PostgresConfiguration
{
    ConnectionString = "Host=localhost;Database=orders;Username=app;Password=secret"
};

var source = new PostgresSourceNode<Order>(
    "SELECT id, customer, amount FROM orders WHERE status = 'pending'",
    config,
    row => new Order(
        row.Get<int>("id"),
        row.Get<string>("customer") ?? "",
        row.Get<decimal>("amount")));

Sink Node - PostgresSinkNode<T>

Writes items to a PostgreSQL table. Supports three write strategies:

StrategyDescriptionBest For
PerRowIndividual INSERT per itemSmall volumes, maximum control
Batch (default)Batched INSERT statementsMost workloads
CopyPostgreSQL COPY protocolMaximum throughput (bulk loads)

Constructors

csharp
// Connection string
public PostgresSinkNode(
    string connectionString, string tableName,
    PostgresWriteStrategy writeStrategy = PostgresWriteStrategy.Batch,
    Func<T, IEnumerable<DatabaseParameter>>? parameterMapper = null,
    PostgresConfiguration? configuration = null,
    string? schema = null)

// Connection pool (recommended for DI)
public PostgresSinkNode(
    IPostgresConnectionPool connectionPool, string tableName,
    PostgresWriteStrategy writeStrategy = PostgresWriteStrategy.Batch,
    Func<T, IEnumerable<DatabaseParameter>>? parameterMapper = null,
    PostgresConfiguration? configuration = null,
    string? schema = null,
    string? connectionName = null)

Example: Batch Upsert

csharp
var config = new PostgresConfiguration
{
    ConnectionString = "Host=localhost;Database=orders;Username=app;Password=secret",
    WriteStrategy = PostgresWriteStrategy.Batch,
    BatchSize = 1000,
    UseUpsert = true,
    UpsertConflictColumns = new[] { "id" },
    OnConflictAction = OnConflictAction.Update
};

var sink = new PostgresSinkNode<Order>("connection-string", "orders", configuration: config);

Configuration

Connection

PropertyTypeDefaultDescription
ConnectionStringstring""PostgreSQL connection string
Schemastring"public"Default schema
CommandTimeoutint30Command timeout (seconds)
ConnectionTimeoutint15Connection timeout (seconds)
CopyTimeoutint300COPY operation timeout (seconds)
MinPoolSizeint5Minimum connection pool size
MaxPoolSizeint50Maximum connection pool size
UseSslModeboolfalseEnable SSL
ReadBufferSizeint8192Read buffer size (bytes)

Write

PropertyTypeDefaultDescription
WriteStrategyPostgresWriteStrategyBatchPerRow, Batch, or Copy
BatchSizeint1000Items per batch
MaxBatchSizeint5000Maximum batch size
UseTransactionbooltrueWrap writes in a transaction
UseBinaryCopyboolfalseUse binary format for COPY

Upsert

PropertyTypeDefaultDescription
UseUpsertboolfalseEnable INSERT ... ON CONFLICT
UpsertConflictColumnsstring[]?nullConflict target columns
OnConflictActionOnConflictActionUpdateUpdate or Ignore

Error Handling

PropertyTypeDefaultDescription
ContinueOnErrorboolfalseContinue on row-level errors
MaxRetryAttemptsint3Retry attempts for transient errors
RetryDelayTimeSpan1sDelay between retries
RowErrorHandlerFunc<Exception, PostgresRow?, bool>?nullCustom error handler

Dependency Injection

csharp
services.AddPostgresConnector(options =>
{
    options.DefaultConnectionString = "Host=localhost;Database=mydb;...";
    options.DefaultConfiguration = new PostgresConfiguration
    {
        WriteStrategy = PostgresWriteStrategy.Batch,
        BatchSize = 1000
    };
});

// Named connections for multi-database scenarios
services.AddPostgresConnection("analytics", "Host=analytics-db;...");
services.AddPostgresConnection("operational", "Host=ops-db;...");

Registers IPostgresConnectionPool, PostgresSourceNodeFactory, and PostgresSinkNodeFactory.

Attribute Mapping

Convention-Based

C# PascalCase property names are automatically converted to PostgreSQL snake_case column names:

  • CustomerIdcustomer_id
  • TotalAmounttotal_amount

[Column] / [IgnoreColumn] (Cross-Connector)

csharp
using NPipeline.Connectors.Attributes;

public class Customer
{
    [Column("customer_id")]
    public int CustomerId { get; set; }

    [IgnoreColumn]
    public string FullName => $"{FirstName} {LastName}";
}

[PostgresColumn] (Connector-Specific)

Extends [Column] with PostgreSQL features:

csharp
using NPipeline.Connectors.Postgres.Mapping;

public class Customer
{
    [PostgresColumn("customer_id", PrimaryKey = true)]
    public int CustomerId { get; set; }

    [PostgresColumn("first_name", DbType = NpgsqlDbType.Varchar, Size = 100)]
    public string FirstName { get; set; } = "";

    [PostgresColumn("email", DbType = NpgsqlDbType.Varchar, Size = 255)]
    public string Email { get; set; } = "";
}
PropertyDescription
NameColumn name in the database
DbTypePostgreSQL data type (NpgsqlDbType)
SizeSize/length for character types
PrimaryKeyPrimary key (used for checkpointing)
IgnoreSkip mapping this property

Use common attributes for portable code; use [PostgresColumn] when you need type or PK control.

Delivery Semantics

SemanticData LossDuplicatesOverheadUse Case
AtLeastOnce (default)NoPossibleLowIdempotent operations
AtMostOncePossibleNoLowTelemetry, metrics
ExactlyOnceNoNoHighFinancial transactions
csharp
var config = new PostgresConfiguration
{
    DeliverySemantic = DeliverySemantic.ExactlyOnce,
    UseTransaction = true,
    CheckpointStrategy = CheckpointStrategy.Offset,
    CheckpointStorage = new FileCheckpointStorage("checkpoints.json")
};

Checkpointing

StrategyPersistenceDescription
None (default)-No checkpointing; restart from beginning
InMemoryProcess lifetimeTransient failure recovery within a single run
OffsetExternal storageTrack position via monotonically increasing column
KeyBasedExternal storageTrack processed items by composite keys
CursorExternal storageTrack cursor position
CDCExternal storageTrack WAL position for logical replication

Offset Example

csharp
var config = new PostgresConfiguration
{
    CheckpointStrategy = CheckpointStrategy.Offset,
    CheckpointOffsetColumn = "id",
    CheckpointStorage = new FileCheckpointStorage("checkpoints/orders.json")
};

var source = new PostgresSourceNode<Order>(connectionString,
    "SELECT * FROM orders WHERE id > @lastCheckpoint ORDER BY id",
    configuration: config);

CDC Example (Logical Replication)

csharp
var config = new PostgresConfiguration
{
    CheckpointStrategy = CheckpointStrategy.CDC,
    CdcSlotName = "my_pipeline_slot",
    CdcPublicationName = "my_publication",
    CheckpointStorage = new FileCheckpointStorage("checkpoints/cdc.json")
};

Checkpoint Intervals

csharp
config.CheckpointInterval = new CheckpointIntervalConfiguration
{
    RowCountInterval = 10_000,
    TimeInterval = TimeSpan.FromMinutes(5)
};

Mapping

PropertyDefaultDescription
CaseInsensitiveMappingtrueMatch Id, id, ID to same property
CacheMappingMetadatatrueCache mapping delegates per type
ValidateIdentifierstrueValidate SQL identifiers to prevent injection
UsePreparedStatementstrueReduce query parsing overhead

Performance

Streaming

csharp
var config = new PostgresConfiguration { StreamResults = true, FetchSize = 1_000 };

Without streaming, Npgsql loads the entire result set into memory.

FetchSizeBest For
100–500Memory-constrained, wide rows
1,000–5,000Most workloads
5,000–10,000Maximum throughput

Write Strategy Comparison

StrategyThroughputLatencyError IsolationUse Case
PerRowLowLowHighReal-time, per-row errors
BatchHighMediumMediumETL, balanced
CopyVery HighHighLowBulk loads, data warehouse

COPY Binary Format

csharp
config.UseBinaryCopy = true; // 20–30% faster than text format

Batch Size Guidelines

RangeBest For
100–500Real-time processing, low latency
500–1,000Balanced throughput and latency
1,000–5,000Bulk loading

Best Practices

  1. Use DI with AddPostgresConnector for production
  2. Enable streaming (StreamResults = true) for large result sets
  3. Use COPY for bulk loading - highest throughput
  4. Enable binary COPY for additional 20–30% performance gain
  5. Enable upsert for idempotent writes
  6. Validate identifiers - never disable in production
  7. Use prepared statements for repeated query patterns
  8. Configure checkpointing for long-running pipelines

Next Steps

Released under the MIT License.