Skip to content

Connectors

Prerequisites: Key Concepts

A connector is a NuGet package that provides source and/or sink nodes for a specific data system. Each connector handles serialization, connection management, and system-specific optimizations.

Choosing a Connector

File Formats

For reading and writing data files. Pair with a storage provider for cloud storage.

ConnectorFormatBest ForPackage
CSVCSV/TSVTabular data, spreadsheet exportsNPipeline.Connectors.Csv
JSONJSON array, NDJSONAPI data, config files, document streamsNPipeline.Connectors.Json
ParquetApache ParquetLarge analytical datasets, columnar queriesNPipeline.Connectors.Parquet
ExcelXLS/XLSXSpreadsheet data, business reportsNPipeline.Connectors.Excel

Databases

For reading from and writing to relational and document databases.

ConnectorSystemKey FeaturesPackage
PostgreSQLPostgreSQLCOPY protocol, upsert, streaming resultsNPipeline.Connectors.Postgres
MySQLMySQL/MariaDBBulk load, upsert, CDC supportNPipeline.Connectors.MySQL
SQL ServerSQL ServerBulkCopy, MERGE upsert, streamingNPipeline.Connectors.SqlServer
SnowflakeSnowflakeJWT auth, batch write, streaming resultsNPipeline.Connectors.Snowflake
MongoDBMongoDBUpsert, change streams, checkpointingNPipeline.Connectors.MongoDB
Cosmos DBAzure Cosmos DBSQL/Mongo/Cassandra APIs, change feedNPipeline.Connectors.Azure.CosmosDb
DuckDBDuckDBAppender writes, auto-create tablesNPipeline.Connectors.DuckDB

Message Queues

For consuming from and publishing to message brokers.

ConnectorSystemKey FeaturesPackage
KafkaApache KafkaConsumer groups, idempotent writes, transactionsNPipeline.Connectors.Kafka
RabbitMQRabbitMQTopology management, acknowledgment strategiesNPipeline.Connectors.RabbitMQ
AWS SQSAmazon SQSLong polling, batch operations, dead letterNPipeline.Connectors.Aws.Sqs
Azure Service BusAzure Service BusQueues, topics, sessions, batch sendingNPipeline.Connectors.Azure.ServiceBus

Specialized

ConnectorSystemPurposePackage
HTTPREST APIsPagination, auth providers, rate limitingNPipeline.Connectors.Http
Data LakeHive-style tablesPartitioned Parquet, snapshots, compactionNPipeline.Connectors.DataLake

Common Patterns

Installation

bash
dotnet add package NPipeline.Connectors.Json

Source → Transform → Sink

Most connectors follow the same pattern:

csharp
public void Define(PipelineBuilder builder, PipelineContext context)
{
    var source = builder.AddSource<JsonSourceNode<Order>, Order>("read");
    var transform = builder.AddTransform<ValidateOrder, Order, ValidatedOrder>("validate");
    var sink = builder.AddSink<PostgresSinkNode<ValidatedOrder>, ValidatedOrder>("write");

    builder.Connect(source, transform);
    builder.Connect(transform, sink);
}

Storage Provider Integration

File-based connectors (CSV, JSON, Parquet, Excel) read from and write to IStorageProvider. This means the same connector code works with local files, S3, Azure Blob, GCS, or SFTP:

csharp
var config = new JsonConfiguration { Format = JsonFormat.NewlineDelimited };
var storageProvider = new AwsS3StorageProvider(s3Options);
var source = new JsonSourceNode<Order>(config, storageProvider, new StorageUri("s3://bucket/orders.ndjson"));

🔗 See also: Storage Providers for choosing and configuring storage backends.

Next Steps

  • Pick a connector from the tables above to see configuration details
  • Storage Providers - configure where file-based connectors read/write
  • Custom Nodes - build your own source or sink

Released under the MIT License.