Skip to content

Snowflake Connector

The NPipeline.Connectors.Snowflake package provides source and sink nodes for Snowflake. Supports parameterized queries, batch inserts, high-performance staged COPY, MERGE upserts, and named connections for multi-warehouse scenarios.

Installation

bash
dotnet add package NPipeline.Connectors.Snowflake

Dependencies: Snowflake.Data 5.x

Source Node - SnowflakeSourceNode<T>

Constructors

csharp
// Connection string + query
public SnowflakeSourceNode(
    string connectionString, string query,
    SnowflakeConfiguration? configuration = null)

// With custom mapper
public SnowflakeSourceNode(
    string connectionString, string query,
    Func<SnowflakeRow, T>? customMapper = null,
    SnowflakeConfiguration? configuration = null)

// Connection pool (recommended for DI)
public SnowflakeSourceNode(
    ISnowflakeConnectionPool connectionPool, string query,
    SnowflakeConfiguration? configuration = null,
    DatabaseParameter[]? parameters = null,
    bool continueOnError = false,
    string? connectionName = null)

Example

csharp
var source = new SnowflakeSourceNode<SalesRecord>(
    "account=myaccount;user=app;password=secret;warehouse=COMPUTE_WH;database=SALES;schema=PUBLIC;",
    "SELECT * FROM orders WHERE order_date >= :start_date",
    configuration: new SnowflakeConfiguration { FetchSize = 10000 });

Sink Node - SnowflakeSinkNode<T>

StrategyDescriptionBest For
PerRowIndividual INSERT per itemSmall volumes
Batch (default)Batched INSERT statementsMost workloads
StagedCopyPUT + COPY INTO via internal stageMaximum throughput

Constructors

csharp
// Connection string
public SnowflakeSinkNode(
    string connectionString, string tableName,
    SnowflakeConfiguration? configuration = null,
    Func<T, IEnumerable<DatabaseParameter>>? customMapper = null)

// Connection pool (recommended for DI)
public SnowflakeSinkNode(
    ISnowflakeConnectionPool connectionPool, string tableName,
    SnowflakeConfiguration? configuration = null,
    Func<T, IEnumerable<DatabaseParameter>>? customMapper = null,
    string? connectionName = null)

Example: Staged Copy

csharp
var config = new SnowflakeConfiguration
{
    ConnectionString = "account=myaccount;user=app;...",
    WriteStrategy = SnowflakeWriteStrategy.StagedCopy,
    StageName = "@my_stage"
};

var sink = new SnowflakeSinkNode<SalesRecord>(
    config.ConnectionString, "SALES.PUBLIC.ORDERS",
    configuration: config);

Configuration

Connection

PropertyTypeDefaultDescription
ConnectionStringstring""Snowflake connection string
Accountstring""Account identifier
Userstring""Username
Rolestring""Role
Warehousestring""Warehouse
Databasestring""Database
Schemastring"PUBLIC"Schema
Authenticatorstring"snowflake"Auth type (snowflake, externalbrowser, snowflake_jwt)
PrivateKeyPathstring?nullPath to private key file (key pair auth)
CommandTimeoutint300Command timeout (seconds)
ConnectionTimeoutint30Connection timeout (seconds)
MinPoolSizeint1Minimum pool size
MaxPoolSizeint10Maximum pool size

Read

PropertyTypeDefaultDescription
StreamResultsbooltrueStream results
FetchSizeint10000Rows per fetch

Write

PropertyTypeDefaultDescription
WriteStrategySnowflakeWriteStrategyBatchPerRow, Batch, or StagedCopy
BatchSizeint1000Items per batch
MaxBatchSizeint16384Max batch size (Snowflake limit)
UseTransactionbooltrueWrap writes in a transaction
StageNamestring?nullStage name for StagedCopy

Upsert (MERGE)

PropertyTypeDefaultDescription
UseUpsertboolfalseEnable MERGE
UpsertKeyColumnsstring[]?nullKey columns for MERGE matching
OnMergeActionOnMergeAction-Update, Ignore, or Delete

Error Handling

PropertyTypeDefaultDescription
ContinueOnErrorboolfalseContinue on errors
MaxRetryAttemptsint3Retry attempts
RetryDelayTimeSpan-Delay between retries

Dependency Injection

csharp
services.AddSnowflakeConnector(options =>
{
    options.DefaultConnectionString = "account=myaccount;user=app;...";
    options.DefaultConfiguration = new SnowflakeConfiguration
    {
        Warehouse = "COMPUTE_WH",
        WriteStrategy = SnowflakeWriteStrategy.Batch
    };
});

services.AddSnowflakeConnection("etl", "account=myaccount;warehouse=ETL_WH;...");

Attribute Mapping

[Column] / [IgnoreColumn] (Cross-Connector)

csharp
using NPipeline.Connectors.Attributes;

public class SalesRecord
{
    [Column("ORDER_ID")]
    public int OrderId { get; set; }

    [IgnoreColumn]
    public decimal CalculatedTotal => Quantity * UnitPrice;
}

[SnowflakeColumn] (Connector-Specific)

csharp
using NPipeline.Connectors.Snowflake.Mapping;

[SnowflakeTable("ORDERS", Schema = "SALES")]
public class SalesRecord
{
    [SnowflakeColumn("ORDER_ID", PrimaryKey = true)]
    public int OrderId { get; set; }

    [SnowflakeColumn("AMOUNT", DbType = "NUMBER(10,2)")]
    public decimal Amount { get; set; }
}
PropertyDescription
NameColumn name
DbTypeSnowflake data type string
PrimaryKeyUsed for checkpointing
IgnoreSkip mapping

Convention-Based

C# PascalCase maps to Snowflake UPPER_SNAKE_CASE by default.

Delivery Semantics

SemanticData LossDuplicatesOverhead
AtLeastOnce (default)NoPossibleLow
AtMostOncePossibleNoLow
ExactlyOnceNoNoHigh

Checkpointing

StrategyDescription
None (default)No checkpointing
InMemoryTransient recovery within a single run
OffsetPersist position via monotonic column
KeyBasedTrack processed items by composite keys
CursorCursor position tracking
csharp
var config = new SnowflakeConfiguration
{
    CheckpointStrategy = CheckpointStrategy.Offset,
    CheckpointOffsetColumn = "ORDER_ID",
    CheckpointStorage = new FileCheckpointStorage("checkpoints/snowflake.json")
};

Performance

Write Strategy Comparison

StrategyThroughputLatencyUse Case
PerRowLowLowSmall volumes, debugging
BatchMediumMediumMost workloads
StagedCopyVery HighHighBulk loads

Snowflake-Specific Considerations

  • Connection latency: Snowflake connections take 2–5s to establish - use connection pooling
  • Identifiers: Snowflake defaults to uppercase - use [SnowflakeColumn] or convention mapping
  • Query tagging: Set ApplicationName for tracking queries in Snowflake history
  • Warehouse sizing: Match warehouse size to pipeline throughput needs
  • Max batch size: Snowflake limits multi-value INSERT to 16,384 rows

Best Practices

  1. Use StagedCopy for bulk loads - PUT + COPY INTO is significantly faster
  2. Use connection pooling - connection establishment is slow
  3. Set MaxBatchSize = 16384 - Snowflake's row limit per INSERT
  4. Enable upsert with MERGE for idempotent loads
  5. Use key pair authentication for service accounts (Authenticator = "snowflake_jwt")
  6. Size your warehouse appropriately for the load

Next Steps

Released under the MIT License.