Skip to content

Kafka Connector

The NPipeline.Connectors.Kafka package provides source and sink nodes for Apache Kafka. Supports consumer groups, exactly-once transactional semantics, multiple serialization formats (JSON, Avro, Protobuf) with Schema Registry integration, configurable acknowledgment strategies, and parallel processing.

Installation

bash
dotnet add package NPipeline.Connectors.Kafka

Dependencies: Confluent.Kafka 2.x, Confluent.SchemaRegistry 2.x (optional: Avro and Protobuf serializers)

Source Node - KafkaSourceNode<T>

Constructors

csharp
public KafkaSourceNode(KafkaConfiguration configuration)

public KafkaSourceNode(
    KafkaConfiguration configuration,
    IKafkaMetrics metrics,
    IRetryStrategy retryStrategy)

// Bring your own consumer
public KafkaSourceNode(
    IConsumer<string, T> consumer,
    KafkaConfiguration configuration,
    IKafkaMetrics metrics,
    IRetryStrategy retryStrategy)

Example

csharp
var config = new KafkaConfiguration
{
    BootstrapServers = "localhost:9092",
    SourceTopic = "orders",
    ConsumerGroupId = "order-processor",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    SerializationFormat = SerializationFormat.Json
};

var source = new KafkaSourceNode<Order>(config);

Sink Node - KafkaSinkNode<T>

Constructors

csharp
public KafkaSinkNode(KafkaConfiguration configuration)

public KafkaSinkNode(
    KafkaConfiguration configuration,
    IKafkaMetrics metrics,
    IRetryStrategy retryStrategy,
    IPartitionKeyProvider<T>? partitionKeyProvider = null)

Example

csharp
var config = new KafkaConfiguration
{
    BootstrapServers = "localhost:9092",
    SinkTopic = "processed-orders",
    EnableIdempotence = true,
    Acks = Acks.All,
    SerializationFormat = SerializationFormat.Json
};

var sink = new KafkaSinkNode<ProcessedOrder>(config);

Configuration

Connection & Security

PropertyTypeDefaultDescription
BootstrapServersstring-Broker addresses (comma-separated)
ClientIdstring?nullClient identifier
SecurityProtocolSecurityProtocolPlaintextPlaintext, Ssl, SaslPlaintext, SaslSsl
SaslMechanismSaslMechanismPlainPlain, ScramSha256, ScramSha512, OAuthBearer
SaslUsernamestring?nullSASL username
SaslPasswordstring?nullSASL password

Consumer (Source)

PropertyTypeDefaultDescription
SourceTopicstring-Topic to consume from
ConsumerGroupIdstring-Consumer group ID
GroupInstanceIdstring?nullStatic group membership ID
AutoOffsetResetAutoOffsetResetLatestEarliest, Latest, or Error
EnableAutoCommitbool-Enable auto-commit
MaxPollRecordsint500Max records per poll
PollTimeoutMsint100Poll timeout (ms)
FetchMinBytesint1Min bytes to fetch
FetchMaxBytesint52428800Max bytes to fetch

Producer (Sink)

PropertyTypeDefaultDescription
SinkTopicstring-Topic to produce to
EnableIdempotencebooltrueIdempotent producer
AcksAcksAllNone, Leader, or All
BatchSizeint16384Producer batch size (bytes)
LingerMsint5Time to wait before sending a batch
CompressionTypeCompressionTypeNoneNone, Gzip, Snappy, Lz4, Zstd
MessageMaxBytesint1000000Max message size

Serialization

PropertyTypeDefaultDescription
SerializationFormatSerializationFormatJsonJson, Avro, or Protobuf
SchemaRegistrySchemaRegistryConfiguration?nullSchema Registry settings (required for Avro/Protobuf)

Delivery Semantics

PropertyTypeDefaultDescription
DeliverySemanticDeliverySemanticAtLeastOnceAtLeastOnce or ExactlyOnce
AcknowledgmentStrategyAcknowledgmentStrategyAutoOnSinkSuccessWhen to acknowledge messages
EnableTransactionsbool-Enable transactional producer
TransactionalIdstring?nullTransactional ID (required for exactly-once)

Schema Registry

csharp
var config = new KafkaConfiguration
{
    BootstrapServers = "localhost:9092",
    SerializationFormat = SerializationFormat.Avro,
    SchemaRegistry = new SchemaRegistryConfiguration
    {
        Url = "http://localhost:8081",
        AutoRegisterSchemas = true,
        SchemaCacheCapacity = 1000
    }
};

Exactly-Once Semantics

csharp
var config = new KafkaConfiguration
{
    BootstrapServers = "localhost:9092",
    DeliverySemantic = DeliverySemantic.ExactlyOnce,
    EnableTransactions = true,
    TransactionalId = "order-processor-1",
    EnableIdempotence = true,
    Acks = Acks.All,
    IsolationLevel = IsolationLevel.ReadCommitted
};

Serialization Formats

FormatDependencySchemaBest For
Json (default)-NoneSimple messages, debugging
AvroConfluent.SchemaRegistry.Serdes.AvroSchema RegistrySchema evolution, compact encoding
ProtobufConfluent.SchemaRegistry.Serdes.ProtobufSchema RegistryCross-language, compact encoding

Schema Registry

csharp
var config = new KafkaConfiguration
{
    SerializationFormat = SerializationFormat.Avro,
    SchemaRegistry = new SchemaRegistryConfiguration
    {
        Url = "http://localhost:8081",
        AutoRegisterSchemas = true,
        SchemaCacheCapacity = 1000
    }
};

Delivery Semantics

SemanticDescriptionConfiguration
AtLeastOnce (default)No data loss, possible duplicatesDefault - AcknowledgeAsync() commits offset
ExactlyOnceNo data loss, no duplicatesRequires transactional producer

At-Least-Once

csharp
// Default: offset committed on AcknowledgeAsync()
await message.AcknowledgeAsync(ct);

Exactly-Once (Transactional)

csharp
var config = new KafkaConfiguration
{
    DeliverySemantic = DeliverySemantic.ExactlyOnce,
    EnableTransactions = true,
    TransactionalId = "order-processor-1",
    EnableIdempotence = true,
    Acks = Acks.All,
    IsolationLevel = IsolationLevel.ReadCommitted
};

With exactly-once, AcknowledgeAsync() is a no-op - offsets are committed as part of the transaction by the sink.

Acknowledgment Strategies

StrategyDescription
AutoOnSinkSuccess (default)Offset committed after successful sink processing
ManualCall message.AcknowledgeAsync() explicitly

Message Metadata

KafkaMessage<T> exposes:

PropertyTypeDescription
BodyTDeserialized message value
Keystring?Message key
TopicstringSource topic
PartitionintPartition number
OffsetlongMessage offset
TimestampDateTimeOffsetMessage timestamp
HeadersHeadersKafka headers

Partitioning

Implement IPartitionKeyProvider<T> for custom partition routing:

csharp
public class OrderPartitionProvider : IPartitionKeyProvider<Order>
{
    public string GetPartitionKey(Order item) => item.CustomerId.ToString();
}

Dead-Letter Handling

Failed messages can be routed to a dead-letter topic via NPipeline's dead-letter mechanism:

csharp
var config = new KafkaConfiguration
{
    DeadLetterTopic = "orders-dlq",
    MaxDeliveryAttempts = 3
};

Best Practices

  1. Use Acks.All + EnableIdempotence for durability
  2. Set ConsumerGroupId per logical consumer - enables parallel processing
  3. Use Avro/Protobuf with Schema Registry for schema evolution
  4. Tune MaxPollRecords to control batch sizes (default 500)
  5. Monitor via IKafkaMetrics - tracks consume/produce rates, lag, errors
  6. Use CompressionType.Lz4 for high-throughput topics
  7. Set LingerMs = 5–50 to batch small messages for better throughput
  8. Use exactly-once semantics only when needed - higher overhead

Next Steps

Released under the MIT License.