Skip to content

DuckDB Connector

The NPipeline.Connectors.DuckDB package provides source and sink nodes for DuckDB, an in-process analytical database. Ideal for local analytics, file format conversion, and ad-hoc queries over Parquet/CSV files. Supports the high-performance Appender API, automatic table creation, and direct file import/export.

Installation

bash
dotnet add package NPipeline.Connectors.DuckDB

Dependencies: DuckDB.NET.Data.Full 1.x

Why DuckDB?

DuckDBSQLitePostgreSQL
Best forAnalytics, OLAPOLTP, embeddedGeneral-purpose server
DeploymentIn-process (no server)In-process (no server)Requires server
Columnar storageYesNoNo
Parquet/CSV queriesNative (read_parquet())NoVia extensions
Concurrent writersNoLimitedYes

Source Node - DuckDBSourceNode<T>

Constructors

csharp
// Database path + query (null path = in-memory)
public DuckDBSourceNode(
    string? databasePath, string query,
    DuckDBConfiguration? configuration = null)

// With custom mapper
public DuckDBSourceNode(
    string? databasePath, string query,
    Func<DuckDBRow, T> rowMapper,
    DuckDBConfiguration? configuration = null)

// Connection factory (recommended for DI)
public DuckDBSourceNode(
    IDuckDBConnectionFactory connectionFactory, string query,
    DuckDBConfiguration? configuration = null)

// Direct file import (Parquet, CSV)
public static DuckDBSourceNode<T> FromFile(
    string filePath,
    DuckDBConfiguration? configuration = null)

Example: Query Parquet Files Directly

csharp
var source = new DuckDBSourceNode<SalesRecord>(
    null, // in-memory
    "SELECT region, SUM(amount) as total FROM read_parquet('sales/*.parquet') GROUP BY region");

Sink Node - DuckDBSinkNode<T>

StrategyDescriptionBest For
Appender (default)DuckDB Appender APIMaximum throughput (fastest)
SqlStandard INSERT statementsUpserts, complex logic

Constructors

csharp
// Database path + table name
public DuckDBSinkNode(
    string? databasePath, string tableName,
    DuckDBConfiguration? configuration = null)

// Connection factory (recommended for DI)
public DuckDBSinkNode(
    IDuckDBConnectionFactory connectionFactory, string tableName,
    DuckDBConfiguration? configuration = null)

// Direct file export (Parquet, CSV)
public static DuckDBSinkNode<T> ToFile(
    string filePath,
    DuckDBConfiguration? configuration = null)

Example: Auto-Create Table

csharp
var config = new DuckDBConfiguration
{
    WriteStrategy = DuckDBWriteStrategy.Appender,
    AutoCreateTable = true
};

var sink = new DuckDBSinkNode<Order>("analytics.duckdb", "orders", configuration: config);

Configuration

Connection

PropertyTypeDefaultDescription
DatabasePathstring?null.duckdb file path. null = in-memory.
AccessModeDuckDBAccessModeAutomaticAutomatic, ReadOnly, or ReadWrite
MemoryLimitstring?nullMaximum memory (e.g., "4GB")
Threadsint0Thread count (0 = auto-detect)
TempDirectorystring?nullSpill-to-disk directory
Extensionsstring[]?nullExtensions to load (e.g., "httpfs", "spatial")
SettingsDictionary<string, string>?nullAdditional DuckDB session settings

Read

PropertyTypeDefaultDescription
StreamResultsbooltrueStream results row-by-row
FetchSizeint2048Rows per fetch batch
ProjectedColumnsstring[]?nullColumn projection
CommandTimeoutint30Command timeout (seconds)

Write

PropertyTypeDefaultDescription
WriteStrategyDuckDBWriteStrategyAppenderAppender (fastest) or Sql
AutoCreateTablebooltrueCreate table if it doesn't exist
TruncateBeforeWriteboolfalseTruncate table before writing
UseTransactionbooltrueWrap writes in a transaction
BatchSizeint1000Batch size (for Sql strategy)

Error Handling

PropertyTypeDefaultDescription
ContinueOnErrorboolfalseContinue on row-level errors
RowErrorHandlerFunc<Exception, long, bool>?nullError handler (receives row index)
ObserverIDuckDBConnectorObserver?nullLifecycle observer for metrics

Dependency Injection

csharp
services.AddDuckDBConnector(options =>
{
    options.DefaultConfiguration = new DuckDBConfiguration
    {
        DatabasePath = "analytics.duckdb",
        MemoryLimit = "4GB"
    };
});

services.AddDuckDBDatabase("reporting", "reporting.duckdb", config =>
{
    config.AccessMode = DuckDBAccessMode.ReadOnly;
});

Write Strategy Comparison

StrategyDescriptionBest For
Appender (default)DuckDB native appender - fastest pathBulk loads, ETL
SqlStandard SQL INSERT statementsSmall volumes, complex logic

The Appender strategy bypasses SQL parsing entirely and writes directly to DuckDB's storage engine.

DuckDB Extensions

Load extensions for additional capabilities:

csharp
var config = new DuckDBConfiguration
{
    Extensions = ["httpfs", "spatial", "json"],
    Settings = new Dictionary<string, string>
    {
        ["s3_region"] = "us-east-1",
        ["s3_access_key_id"] = "...",
        ["s3_secret_access_key"] = "..."
    }
};
ExtensionDescription
httpfsRead from HTTP/S3 URLs
spatialSpatial data types and functions
jsonJSON file reading/writing
parquetParquet file support (built-in)

Performance

Memory & Spill-to-Disk

csharp
var config = new DuckDBConfiguration
{
    MemoryLimit = "4GB",
    TempDirectory = "/tmp/duckdb-spill",
    Threads = 8
};

When memory is exhausted, DuckDB spills intermediate results to the TempDirectory.

Auto-Create Table

csharp
var sink = new DuckDBSinkNode<SalesRecord>(
    "analytics.duckdb", "sales",
    new DuckDBConfiguration
    {
        AutoCreateTable = true,        // infer schema from T
        TruncateBeforeWrite = false    // append mode
    });

Best Practices

  1. Use Appender strategy (default) - significantly faster than SQL inserts
  2. Set MemoryLimit to prevent unbounded memory growth
  3. Configure TempDirectory for large datasets that exceed memory
  4. Use ReadOnly access mode for concurrent read pipelines
  5. Use in-memory mode (DatabasePath = null) for ephemeral analytical pipelines
  6. Load extensions at configuration time - not mid-pipeline

Next Steps

Released under the MIT License.