Skip to content

Cosmos DB Connector

The NPipeline.Connectors.Azure.CosmosDb package provides source and sink nodes for Azure Cosmos DB. Supports all three Cosmos DB APIs: SQL (Core), MongoDB, and Cassandra. Features include connection pooling, transactional batches, change feed processing, and multiple authentication modes.

Installation

bash
dotnet add package NPipeline.Connectors.Azure.CosmosDb

Dependencies: Microsoft.Azure.Cosmos 3.x, Azure.Identity 1.x, MongoDB.Driver 3.x, CassandraCSharpDriver 3.x

API Types

APINode ClassesWhen to Use
SQL (default)CosmosSourceNode<T>, CosmosSinkNode<T>Standard Cosmos DB with SQL queries
MongoDBCosmosMongoSourceNode<T>, CosmosMongoSinkNode<T>Cosmos DB with MongoDB wire protocol
CassandraCosmosCassandraSourceNode<T>, CosmosCassandraSinkNode<T>Cosmos DB with Cassandra wire protocol

Source Node - CosmosSourceNode<T> (SQL API)

Constructors

csharp
// Connection string
public CosmosSourceNode(
    string connectionString,
    string databaseId, string containerId,
    string query,
    Func<CosmosRow, T>? mapper = null,
    CosmosConfiguration? configuration = null,
    DatabaseParameter[]? parameters = null,
    bool continueOnError = false)

// Connection pool (recommended for DI)
public CosmosSourceNode(
    ICosmosConnectionPool connectionPool,
    string databaseId, string containerId,
    string query,
    Func<CosmosRow, T>? mapper = null,
    CosmosConfiguration? configuration = null,
    DatabaseParameter[]? parameters = null,
    bool continueOnError = false,
    string? connectionName = null)

Example

csharp
var source = new CosmosSourceNode<Order>(
    "AccountEndpoint=https://mydb.documents.azure.com:443/;AccountKey=...",
    "orders-db", "orders",
    "SELECT * FROM c WHERE c.status = 'pending'");

Sink Node - CosmosSinkNode<T> (SQL API)

StrategyDescriptionBest For
Upsert (default)Per-item upsertIdempotent writes
InsertPer-item insertAppend-only
BatchBatched operationsRelated items in same partition
TransactionalBatchTransactional batchAtomicity within a partition
BulkBulk executionMaximum throughput

Constructors

csharp
// Connection string
public CosmosSinkNode(
    string connectionString,
    string databaseId, string containerId,
    CosmosWriteStrategy writeStrategy = CosmosWriteStrategy.Batch,
    Func<T, string>? idSelector = null,
    Func<T, PartitionKey>? partitionKeySelector = null,
    CosmosConfiguration? configuration = null)

// Connection pool (recommended for DI)
public CosmosSinkNode(
    ICosmosConnectionPool connectionPool,
    string databaseId, string containerId,
    CosmosWriteStrategy writeStrategy = CosmosWriteStrategy.Batch,
    Func<T, string>? idSelector = null,
    Func<T, PartitionKey>? partitionKeySelector = null,
    CosmosConfiguration? configuration = null,
    string? connectionName = null)

Authentication

ModeDescription
ConnectionString (default)Standard Cosmos DB connection string
AccountEndpointAndKeyExplicit endpoint + key
AzureAdCredentialAzure AD / Managed Identity (recommended for production)
csharp
// Azure AD authentication
var config = new CosmosConfiguration
{
    AuthenticationMode = CosmosAuthenticationMode.AzureAdCredential,
    AccountEndpoint = "https://mydb.documents.azure.com:443/"
};

Configuration

Connection

PropertyTypeDefaultDescription
ApiTypeCosmosApiTypeSqlSql, Mongo, or Cassandra
ConnectionStringstring""Connection string
AccountEndpointstring""Account endpoint URL
DatabaseIdstring""Database ID (required)
ContainerIdstring?nullContainer/collection ID
AuthenticationModeCosmosAuthenticationModeConnectionStringAuthentication mode
ConsistencyLevelConsistencyLevel?nullConsistency level override
PreferredRegionsList<string>[]Preferred regions for geo-replicated accounts
UseGatewayModeboolfalseUse gateway mode (vs direct)

Write

PropertyTypeDefaultDescription
WriteStrategyCosmosWriteStrategyUpsertWrite strategy
BatchSizeint100Batch size
UseTransactionalBatchbooltrueUse transactional batches
AllowBulkExecutionboolfalseEnable bulk execution mode
MaxConcurrentOperationsint500Max concurrent operations
PartitionKeyPathstring"/id"Partition key path
AutoCreateContainerboolfalseAuto-create container if missing
Throughputint?nullProvisioned RU/s for auto-created containers

Read

PropertyTypeDefaultDescription
MaxItemCountint-1Max items per page (-1 = server default)
EnableCrossPartitionQuerybooltrueAllow cross-partition queries
StreamResultsboolfalseStream results

Dependency Injection

csharp
// Connection string
services.AddCosmosDbConnector("AccountEndpoint=...;AccountKey=...");

// Azure AD with endpoint
services.AddCosmosDbConnector(
    new Uri("https://mydb.documents.azure.com:443/"),
    new DefaultAzureCredential());

// Full options
services.AddCosmosDbConnector(options =>
{
    options.DefaultConnectionString = "AccountEndpoint=...;AccountKey=...";
    options.DefaultConfiguration = new CosmosConfiguration
    {
        WriteStrategy = CosmosWriteStrategy.Bulk,
        AllowBulkExecution = true
    };
});

Next Steps

API Types

APIDescription
Sql (default)Cosmos DB SQL (Core) API
MongoMongoDB API compatibility
CassandraCassandra API compatibility

Partition Keys

Partition key selection is critical for Cosmos DB performance:

csharp
var config = new CosmosConfiguration
{
    PartitionKeyPath = "/customerId",
    EnableCrossPartitionQuery = true  // required for queries without partition key filter
};

Cross-partition queries fan out to all partitions - use partition key filters when possible.

Consistency Levels

LevelLatencyConsistencyRU Cost
StrongHighLinearizableHigh
BoundedStalenessMediumBounded lagMedium
Session (default)LowRead-your-writesLow
ConsistentPrefixLowOrderedLow
EventualLowestNo ordering guaranteeLowest
csharp
config.ConsistencyLevel = ConsistencyLevel.Session;

RU/Throughput Management

csharp
var config = new CosmosConfiguration
{
    AutoCreateContainer = true,
    Throughput = 4000,                  // provisioned RU/s
    AllowBulkExecution = true,          // enable SDK bulk mode
    MaxConcurrentOperations = 500       // concurrent operations limit
};

Bulk Execution

Enable AllowBulkExecution = true for high-throughput writes. The Cosmos SDK automatically batches operations by partition key and parallelizes across partitions.

Change Feed

Read the Cosmos DB change feed for CDC-style processing:

csharp
var source = new CosmosChangeFeedSourceNode<Order>(new CosmosConfiguration
{
    ConnectionString = "...",
    DatabaseId = "sales",
    ContainerId = "orders",
    ChangeFeedLeaseContainerId = "leases",
    ChangeFeedStartFrom = ChangeFeedStartFrom.Beginning()
});

The change feed provides an ordered stream of changes within each partition.

Best Practices

  1. Use Azure AD auth in production - avoid connection strings with keys
  2. Choose partition keys that distribute load evenly and match query patterns
  3. Avoid cross-partition queries in hot paths - filter by partition key
  4. Use bulk execution for high-throughput writes
  5. Use Session consistency unless you need stronger guarantees
  6. Set MaxConcurrentOperations to avoid throttling (429 responses)
  7. Use direct mode (default) - gateway mode adds a network hop
  8. Configure PreferredRegions for geo-replicated accounts

Released under the MIT License.