Skip to content

AWS SQS Connector

The NPipeline.Connectors.Aws.Sqs package provides source and sink nodes for Amazon SQS. Supports long polling, configurable visibility timeout, batch send/receive/delete, message attributes, and multiple acknowledgment strategies.

Installation

bash
dotnet add package NPipeline.Connectors.Aws.Sqs

Dependencies: AWSSDK.SQS 4.x, AWSSDK.Extensions.NETCore.Setup 4.x

Source Node - SqsSourceNode<T>

Constructors

csharp
public SqsSourceNode(SqsConfiguration configuration)

// Bring your own client
public SqsSourceNode(IAmazonSQS sqsClient, SqsConfiguration configuration)

Example

csharp
var config = new SqsConfiguration
{
    SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/orders",
    Region = "us-east-1",
    MaxNumberOfMessages = 10,
    WaitTimeSeconds = 20,       // long polling
    VisibilityTimeout = 60,
    AcknowledgmentStrategy = AcknowledgmentStrategy.AutoOnSinkSuccess
};

var source = new SqsSourceNode<Order>(config);

Sink Node - SqsSinkNode<T>

Constructors

csharp
public SqsSinkNode(SqsConfiguration configuration)

// Bring your own client
public SqsSinkNode(IAmazonSQS sqsClient, SqsConfiguration configuration)

Example

csharp
var config = new SqsConfiguration
{
    SinkQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/processed-orders",
    Region = "us-east-1",
    BatchSize = 10,
    DelaySeconds = 0
};

var sink = new SqsSinkNode<ProcessedOrder>(config);

AWS Credentials

The connector resolves credentials in this order:

  1. Explicit credentials - AccessKeyId + SecretAccessKey in configuration
  2. Named profile - ProfileName in configuration
  3. Default credential chain - environment variables, instance profile, etc.
csharp
// Explicit credentials (development only - use IAM roles in production)
var config = new SqsConfiguration
{
    AccessKeyId = "AKIA...",
    SecretAccessKey = "...",
    Region = "us-east-1",
    SourceQueueUrl = "..."
};

// Named profile
var config = new SqsConfiguration
{
    ProfileName = "my-profile",
    Region = "us-east-1",
    SourceQueueUrl = "..."
};

// Default chain (recommended for production - EC2 instance role, ECS task role, etc.)
var config = new SqsConfiguration
{
    Region = "us-east-1",
    SourceQueueUrl = "..."
};

Configuration

AWS

PropertyTypeDefaultDescription
AccessKeyIdstring?nullAWS access key ID
SecretAccessKeystring?nullAWS secret access key
Regionstring"us-east-1"AWS region
ProfileNamestring?nullAWS credential profile

Queue

PropertyTypeDefaultDescription
SourceQueueUrlstring-Source queue URL
SinkQueueUrlstring-Sink queue URL

Polling (Source)

PropertyTypeDefaultDescription
MaxNumberOfMessagesint10Messages per receive (1–10)
WaitTimeSecondsint20Long polling wait (0–20 seconds)
VisibilityTimeoutint30Visibility timeout (seconds)
PollingIntervalMsint1000Interval between polls (ms)

Batching (Sink)

PropertyTypeDefaultDescription
BatchSizeint10Messages per send batch (1–10)
DelaySecondsint0Message delivery delay

Acknowledgment

PropertyTypeDefaultDescription
AcknowledgmentStrategyAcknowledgmentStrategyAutoOnSinkSuccessAutoOnSinkSuccess, Manual, or Delayed
AcknowledgmentDelayMsint5000Delay before acknowledging (Delayed strategy)

JSON Serialization

PropertyTypeDefaultDescription
PropertyNamingPolicyJsonPropertyNamingPolicyCamelCaseCamelCase, PascalCase, Snake_case, Kebab-case
PropertyNameCaseInsensitivebooltrueCase-insensitive deserialization

Error Handling

PropertyTypeDefaultDescription
MaxRetriesint3Retry attempts
RetryBaseDelayMsint1000Base retry delay (ms)
ContinueOnErrorbooltrueContinue on errors
MessageErrorHandlerFunc<...>?nullCustom error handler

Next Steps

Acknowledgment Strategies

StrategyDescription
AutoOnSinkSuccess (default)Message deleted after successful sink processing
ManualCall message.AcknowledgeAsync() explicitly
DelayedDelete after AcknowledgmentDelayMs (allows downstream confirmation)
NoneNo acknowledgment - message reappears after VisibilityTimeout

Manual Acknowledgment

csharp
pipeline.AddTransform<SqsMessage<Order>, ProcessedOrder>(async (msg, ct) =>
{
    var result = Process(msg.Body);
    await msg.AcknowledgeAsync(ct); // deletes from queue
    return result;
});

Batch Acknowledgment

When using AutoOnSinkSuccess with batch receive (MaxNumberOfMessages > 1), messages are deleted in batch using DeleteMessageBatch for efficiency.

SqsMessage<T> Wrapper

Source nodes emit SqsMessage<T> which exposes SQS metadata:

PropertyTypeDescription
BodyTDeserialized message body
MessageIdstringSQS message ID
ReceiptHandlestringReceipt handle (for acknowledgment)
MessageAttributesDictionary<string, MessageAttribute>Custom message attributes
ApproximateReceiveCountintNumber of times message has been received
SentTimestampDateTimeOffsetWhen message was sent

Dead-Letter Queue

Configure a DLQ in SQS (not in the connector). After maxReceiveCount deliveries, SQS automatically moves the message to the DLQ. Use the connector to process DLQ messages:

csharp
var dlqSource = new SqsSourceNode<SqsMessage<Order>>(new SqsConfiguration
{
    SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789/orders-dlq",
    MaxNumberOfMessages = 10
});

Best Practices

  1. Use long polling (WaitTimeSeconds = 20) - reduces empty responses and cost
  2. Set VisibilityTimeout > processing time - prevents duplicate processing
  3. Use IAM roles for credentials in production (EC2 instance role, ECS task role)
  4. Batch operations - SQS charges per request, batching reduces cost
  5. Configure a DLQ on the SQS queue for poison messages
  6. Monitor ApproximateReceiveCount to detect stuck messages
  7. Use FIFO queues when message ordering matters (set MessageGroupId)

Released under the MIT License.