Skip to content

RabbitMQ Connector

The NPipeline.Connectors.RabbitMQ package provides source and sink nodes for RabbitMQ. Supports quorum queues, QoS prefetch, publisher confirms, batch publishing, TLS, automatic topology declaration, dead-letter exchanges, and poison message detection.

Installation

bash
dotnet add package NPipeline.Connectors.RabbitMQ

Dependencies: RabbitMQ.Client 7.x

Source Node - RabbitMqSourceNode<T>

Constructor

csharp
public RabbitMqSourceNode(
    RabbitMqSourceOptions options,
    IRabbitMqConnectionManager connectionManager,
    IMessageSerializer serializer,
    IRabbitMqMetrics? metrics = null,
    ILogger<RabbitMqSourceNode<T>>? logger = null)

Example

csharp
var sourceOptions = new RabbitMqSourceOptions("order-queue")
{
    PrefetchCount = 100,
    AcknowledgmentStrategy = AcknowledgmentStrategy.AutoOnSinkSuccess,
    MaxDeliveryAttempts = 5,
    Topology = new RabbitMqTopologyOptions
    {
        AutoDeclare = true,
        QueueType = QueueType.Quorum,
        DeadLetterExchange = "dlx"
    }
};

Sink Node - RabbitMqSinkNode<T>

Constructor

csharp
public RabbitMqSinkNode(
    RabbitMqSinkOptions options,
    IRabbitMqConnectionManager connectionManager,
    IMessageSerializer serializer,
    IRabbitMqMetrics? metrics = null,
    ILogger<RabbitMqSinkNode<T>>? logger = null)

Example

csharp
var sinkOptions = new RabbitMqSinkOptions("order-exchange")
{
    RoutingKey = "processed",
    EnablePublisherConfirms = true,
    Persistent = true,
    Batching = new BatchPublishOptions
    {
        BatchSize = 100,
        LingerTime = TimeSpan.FromMilliseconds(50)
    }
};

Configuration

Connection - RabbitMqConnectionOptions

PropertyTypeDefaultDescription
HostNamestring"localhost"RabbitMQ server hostname
Portint5672AMQP port
VirtualHoststring"/"Virtual host
UserNamestring"guest"Username
Passwordstring"guest"Password
UriUri?nullFull AMQP URI (overrides individual settings)
AutomaticRecoveryEnabledbooltrueAuto-reconnect on failure
RequestedHeartbeatTimeSpan60sHeartbeat interval
MaxChannelPoolSizeint4Max pooled channels

TLS - RabbitMqTlsOptions

csharp
var connection = new RabbitMqConnectionOptions
{
    HostName = "rabbitmq.example.com",
    Port = 5671,
    Tls = new RabbitMqTlsOptions
    {
        Enabled = true,
        ServerName = "rabbitmq.example.com",
        CertificatePath = "/path/to/client.pfx",
        SslProtocols = SslProtocols.Tls12
    }
};

Source - RabbitMqSourceOptions

PropertyTypeDefaultDescription
QueueNamestring(required)Queue to consume from
PrefetchCountushort100QoS prefetch count
AcknowledgmentStrategyAcknowledgmentStrategyAutoOnSinkSuccessWhen to ACK messages
RequeueOnNackbooltrueRequeue rejected messages
MaxDeliveryAttemptsint?5Poison message threshold
RejectOnMaxDeliveryAttemptsbooltrueReject after max attempts
ConsumerDispatchConcurrencyint1Concurrent dispatch
InternalBufferCapacityint1000Internal buffer size

Sink - RabbitMqSinkOptions

PropertyTypeDefaultDescription
ExchangeNamestring(required)Exchange to publish to
RoutingKeystring""Default routing key
RoutingKeySelectorFunc<object, string>?nullPer-message routing key
EnablePublisherConfirmsbooltrueWait for broker confirmation
PersistentbooltrueMark messages as persistent
MandatoryboolfalseRequire at least one queue binding

Batch Publishing - BatchPublishOptions

PropertyTypeDefaultDescription
BatchSizeint100Messages per batch
LingerTimeTimeSpan50msTime to wait before sending partial batch

Topology - RabbitMqTopologyOptions

PropertyTypeDefaultDescription
AutoDeclarebooltrueAuto-declare exchanges, queues, and bindings
QueueTypeQueueTypeQuorumClassic, Quorum (recommended), or Stream
DurablebooltrueDurable queue/exchange
DeadLetterExchangestring?nullDead-letter exchange name
DeadLetterRoutingKeystring?nullDead-letter routing key
MessageTtlMsint?nullMessage TTL in milliseconds
MaxLengthint?nullMax queue length (messages)
MaxLengthBytesint?nullMax queue size (bytes)

Dependency Injection

csharp
services.AddRabbitMq(connection =>
{
    connection.HostName = "rabbitmq.example.com";
    connection.UserName = "app";
    connection.Password = "secret";
});

services.AddRabbitMqSource<Order>(new RabbitMqSourceOptions("order-queue")
{
    PrefetchCount = 200,
    AcknowledgmentStrategy = AcknowledgmentStrategy.AutoOnSinkSuccess
});

services.AddRabbitMqSink<ProcessedOrder>(new RabbitMqSinkOptions("processed-exchange")
{
    RoutingKey = "orders.processed",
    EnablePublisherConfirms = true
});

Next Steps

Topology Auto-Declaration

When AutoDeclare = true (default), the connector creates exchanges, queues, and bindings on startup:

csharp
var topology = new RabbitMqTopologyOptions
{
    AutoDeclare = true,
    QueueType = QueueType.Quorum,
    Durable = true,
    DeadLetterExchange = "dlx",
    DeadLetterRoutingKey = "dead-letter"
};

Queue Types

TypeDescription
ClassicTraditional RabbitMQ queues
Quorum (default)Replicated, fault-tolerant - recommended for production
StreamAppend-only log - for replay scenarios

Dynamic Routing Keys

Route messages per-item using RoutingKeySelector:

csharp
var sink = new RabbitMqSinkNode<Order>(new RabbitMqSinkOptions("order-exchange")
{
    RoutingKeySelector = order => $"orders.{order.Region.ToLower()}"
});

Connection Management

  • Lazy connection: Connections are created on first use
  • Automatic recovery: The underlying RabbitMQ client reconnects on failure
  • Channel pooling: Channels are pooled and reused across operations

Push-to-Pull Bridge

RabbitMqSourceNode<T> internally bridges RabbitMQ's push-based consumer to NPipeline's pull-based model using a bounded Channel<T>:

csharp
var source = new RabbitMqSourceNode<Order>(new RabbitMqSourceOptions("order-queue")
{
    InternalBufferCapacity = 1000,  // bounded channel capacity
    PrefetchCount = 200             // QoS prefetch
});

If the buffer fills, RabbitMQ backpressure kicks in (broker stops delivering until space is available).

Acknowledgment Strategies

StrategyDescription
AutoOnSinkSuccess (default)ACK after sink processing completes
ManualCall message.AcknowledgeAsync() explicitly

Poison Message Handling

csharp
var source = new RabbitMqSourceNode<Order>(new RabbitMqSourceOptions("order-queue")
{
    MaxDeliveryAttempts = 5,
    RejectOnMaxDeliveryAttempts = true,  // NACK without requeue → goes to DLX
    RequeueOnNack = true                 // requeue on failure (before max attempts)
});

Observability

Implement IRabbitMqMetrics to collect connection, channel, publish, and consume metrics:

csharp
services.AddSingleton<IRabbitMqMetrics, MyRabbitMqMetrics>();

Best Practices

  1. Use quorum queues - fault-tolerant and recommended for production
  2. Enable publisher confirms - ensures messages reach the broker
  3. Configure DLX for poison message handling
  4. Set PrefetchCount proportional to consumer throughput
  5. Use TLS in production (Tls.Enabled = true)
  6. Tune InternalBufferCapacity - too small causes backpressure, too large wastes memory
  7. Use batch publishing for high-throughput sinks (BatchSize, LingerTime)

Released under the MIT License.