Skip to content

AWS SQS Connector

The NPipeline.Connectors.Aws.Sqs package provides specialized source and sink nodes for working with Amazon Simple Queue Service (SQS). This allows you to easily integrate SQS message processing into your pipelines as an input source or an output destination.

This connector uses the robust AWSSDK.SQS library under the hood, so it is powerful and highly configurable for production workloads.

Installation

To use the AWS SQS connector, install the NPipeline.Connectors.Aws.Sqs NuGet package:

bash
dotnet add package NPipeline.Connectors.Aws.Sqs

For the core NPipeline package and other available extensions, see the Installation Guide.

AWS Credentials

The SQS connector supports multiple credential methods to provide flexibility across different deployment scenarios:

Access Key and Secret Key

csharp
var config = new SqsConfiguration
{
    AccessKeyId = "AKIAIOSFODNN7EXAMPLE",
    SecretAccessKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    Region = "us-east-1"
};

AWS Profile

csharp
var config = new SqsConfiguration
{
    ProfileName = "default",
    Region = "us-east-1"
};

Default Credential Chain

csharp
var config = new SqsConfiguration
{
    Region = "us-east-1"
};

Why multiple credential methods: The default credential chain provides the most flexibility for deployment scenarios (EC2, ECS, Lambda), while explicit credentials are useful for local development and testing. The connector automatically falls back to the default AWS credential chain when no explicit credentials are provided.

SqsSourceNode<T>

The SqsSourceNode<T> continuously polls an SQS queue and emits each deserialized message as an item of type SqsMessage<T>.

Source Configuration

The constructor for SqsSourceNode<T> takes configuration for connecting to SQS:

csharp
public SqsSourceNode(SqsConfiguration configuration)
public SqsSourceNode(IAmazonSQS sqsClient, SqsConfiguration configuration)
  • configuration: The SqsConfiguration object with queue URL, polling settings, and AWS credentials.
  • sqsClient: (Optional) A custom IAmazonSQS client. If not provided, one is created from the configuration.

Example: Reading from an SQS Queue

csharp
using NPipeline.Connectors.Aws.Sqs.Configuration;
using NPipeline.Connectors.Aws.Sqs.Nodes;
using NPipeline.DataFlow.DataStreams;
using NPipeline.DataFlow;
using NPipeline.Execution;
using NPipeline.Nodes;
using NPipeline.Pipeline;

public sealed record OrderMessage(string OrderId, string CustomerId, decimal Amount, DateTime OrderDate);

public sealed class SqsReaderPipeline : IPipelineDefinition
{
    public void Define(PipelineBuilder builder, PipelineContext context)
    {
        var config = new SqsConfiguration
        {
            Region = "us-east-1",
            SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/orders",
            MaxNumberOfMessages = 10,
            WaitTimeSeconds = 20,
            VisibilityTimeout = 30
        };

        var sourceNode = new SqsSourceNode<OrderMessage>(config);
        var source = builder.AddSource(sourceNode, "sqs_source");
        var sink = builder.AddSink<ConsoleSinkNode, SqsMessage<OrderMessage>>("console_sink");

        builder.Connect(source, sink);
    }
}

public sealed class ConsoleSinkNode : SinkNode<SqsMessage<OrderMessage>>
{
    public override async Task ConsumeAsync(
        IDataStream<SqsMessage<OrderMessage>> input,
        PipelineContext context,
        IPipelineActivity parentActivity,
        CancellationToken cancellationToken)
    {
        await foreach (var message in input.WithCancellation(cancellationToken))
        {
            Console.WriteLine($"Received: {message.Body}");
        }
    }
}

Polling Configuration

The source node supports configurable polling behavior:

PropertyTypeDefaultDescription
SourceQueueUrlstringRequiredSQS queue URL to poll from
MaxNumberOfMessagesint10Maximum messages per poll (1-10, SQS API limit)
WaitTimeSecondsint20Long polling wait time (0-20 seconds)
VisibilityTimeoutint30Message visibility timeout in seconds
PollingIntervalMsint1000Polling interval when queue is empty (milliseconds)

Why long polling: Long polling (WaitTimeSeconds > 0) reduces cost and empty responses by keeping the request open until messages arrive or the timeout expires. The default of 20 seconds maximizes cost efficiency by minimizing the number of empty poll requests.

SqsSinkNode<T>

The SqsSinkNode<T> writes items from the pipeline to an SQS queue by serializing them to JSON.

Sink Configuration

The constructor for SqsSinkNode<T> takes configuration for connecting to SQS:

csharp
public SqsSinkNode(SqsConfiguration configuration)
public SqsSinkNode(IAmazonSQS sqsClient, SqsConfiguration configuration)
  • configuration: The SqsConfiguration object with queue URL, batch settings, and AWS credentials.
  • sqsClient: (Optional) A custom IAmazonSQS client. If not provided, one is created from the configuration.

Example: Writing to an SQS Queue

csharp
using NPipeline.Connectors.Aws.Sqs.Configuration;
using NPipeline.Connectors.Aws.Sqs.Nodes;
using NPipeline.Execution;
using NPipeline.Extensions.Testing;
using NPipeline.Nodes;
using NPipeline.Pipeline;

public sealed record ProcessedOrder(string OrderId, string CustomerId, decimal Amount, bool IsValid);

public sealed class SqsWriterPipeline : IPipelineDefinition
{
    public void Define(PipelineBuilder builder, PipelineContext context)
    {
        var config = new SqsConfiguration
        {
            Region = "us-east-1",
            SinkQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/processed-orders",
            BatchSize = 10
        };

        var source = builder.AddSource<InMemorySourceNode<ProcessedOrder>, ProcessedOrder>("source");
        var sinkNode = new SqsSinkNode<ProcessedOrder>(config);
        var sink = builder.AddSink(sinkNode, "sqs_sink");

        builder.Connect(source, sink);
    }
}

Sending Configuration

The sink node supports configurable sending behavior:

PropertyTypeDefaultDescription
SinkQueueUrlstringRequiredSQS queue URL to send messages to
BatchSizeint10Batch size for sending messages (1-10)
DelaySecondsint0Message delivery delay (0-900 seconds)
MessageAttributesIDictionary<string, MessageAttributeValue>?nullMessage attributes to add to all outgoing messages

Acknowledgment Strategies

The SQS connector provides multiple acknowledgment strategies to handle different processing scenarios:

AutoOnSinkSuccess (Default)

Messages are automatically acknowledged immediately after successful sink processing:

csharp
var config = new SqsConfiguration
{
    SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/input-queue",
    SinkQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/output-queue",
    AcknowledgmentStrategy = AcknowledgmentStrategy.AutoOnSinkSuccess
};

Why this is default: Provides the best developer experience with automatic message cleanup after successful processing, reducing the risk of duplicate message handling.

Manual

Messages are sent to the sink but not acknowledged. You must manually call AcknowledgeAsync():

csharp
var config = new SqsConfiguration
{
    AcknowledgmentStrategy = AcknowledgmentStrategy.Manual
};

// In a transform node
public class ManualAckTransform : ITransformNode<SqsMessage<OrderMessage>, SqsMessage<OrderMessage>>
{
    public async Task<SqsMessage<OrderMessage>> TransformAsync(
        SqsMessage<OrderMessage> input,
        PipelineContext context,
        CancellationToken cancellationToken)
    {
        // Process the message
        var processed = ProcessOrder(input.Body);
        
        // Manually acknowledge when ready
        await input.AcknowledgeAsync(cancellationToken);
        
        return input.WithBody(processed);
    }
}

When to use: Use when you need fine-grained control over acknowledgment timing, such as when processing depends on external systems with their own transaction boundaries.

Delayed

Messages are acknowledged after a configurable delay:

csharp
var config = new SqsConfiguration
{
    AcknowledgmentStrategy = AcknowledgmentStrategy.Delayed,
    AcknowledgmentDelayMs = 5000 // 5 seconds
};

When to use: Useful when downstream systems need time to process messages before they are removed from the queue, providing a window for recovery if processing fails.

None

Messages are never acknowledged automatically:

csharp
var config = new SqsConfiguration
{
    AcknowledgmentStrategy = AcknowledgmentStrategy.None
};

Warning: Messages remain in the queue until their visibility timeout expires and become available for reprocessing. Use with caution.

Batch Acknowledgment

Batch acknowledgment improves performance by reducing the number of SQS API calls:

csharp
var config = new SqsConfiguration
{
    SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/input-queue",
    SinkQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/output-queue",
    AcknowledgmentStrategy = AcknowledgmentStrategy.AutoOnSinkSuccess,
    
    BatchAcknowledgment = new BatchAcknowledgmentOptions
    {
        // Maximum messages per batch (1-10)
        BatchSize = 10,
        
        // Maximum wait time before flushing partial batch (milliseconds)
        FlushTimeoutMs = 1000,
        
        // Enable automatic batching
        EnableAutomaticBatching = true,
        
        // Maximum concurrent batch operations
        MaxConcurrentBatches = 3
    }
};

Why batch acknowledgment: Reduces SQS API calls and costs by acknowledging multiple messages in a single DeleteMessageBatch operation. The timeout-based flush ensures messages are acknowledged even when the batch size is not reached.

Batch Acknowledgment Options

PropertyTypeDefaultDescription
BatchSizeint10Maximum messages per batch (1-10)
FlushTimeoutMsint1000Maximum wait before flushing partial batch (milliseconds)
EnableAutomaticBatchingbooltrueEnable automatic batch acknowledgment
MaxConcurrentBatchesint3Maximum concurrent batch operations

Parallel Processing

Enable parallel processing for high-throughput scenarios:

csharp
var config = new SqsConfiguration
{
    SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/input-queue",
    SinkQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/output-queue",
    
    EnableParallelProcessing = true,
    MaxDegreeOfParallelism = 4
};

When to use: Ideal for CPU-intensive transformations or when processing speed is critical and message order is not important. For ordered processing, keep parallelism disabled.

JSON Serialization

The connector uses System.Text.Json for serialization with configurable options:

PropertyTypeDefaultDescription
PropertyNamingPolicyJsonPropertyNamingPolicyCamelCaseJSON property naming policy
PropertyNameCaseInsensitivebooltrueCase-insensitive property matching

JsonPropertyNamingPolicy

ValueDescription
LowerCaseProperty names converted to lowercase
CamelCaseProperty names converted to camelCase (default)
SnakeCaseProperty names converted to snake_case
PascalCaseProperty names converted to PascalCase
AsIsProperty names used as-is

Error Handling

Retry Logic

The connector automatically retries on transient errors with exponential backoff:

csharp
var config = new SqsConfiguration
{
    MaxRetries = 3,
    RetryBaseDelayMs = 1000
};

Retry behavior: Transient errors (ServiceUnavailable, TooManyRequests, InternalServerError) trigger exponential backoff with jitter to avoid thundering herd problems.

Message Error Handler

Handle deserialization errors per message:

csharp
var config = new SqsConfiguration
{
    SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/input-queue",
    
    // Return true to skip the message, false to fail the pipeline
    MessageErrorHandler = (exception, message) =>
    {
        Console.WriteLine($"Failed to deserialize message {message.MessageId}: {exception.Message}");
        return true; // Skip and continue
    },
    
    ContinueOnError = true
};

Continue on Error

Configure whether to continue processing on errors:

csharp
var config = new SqsConfiguration
{
    ContinueOnError = true // Continue processing on send failures
};

SqsMessage<T>

The SqsMessage<T> wraps deserialized messages with acknowledgment capability:

csharp
public sealed class SqsMessage<T> : IAcknowledgableMessage<T>
{
    // The deserialized message body
    public T Body { get; }
    
    // SQS message ID
    public string MessageId { get; }
    
    // Receipt handle for deletion
    public string ReceiptHandle { get; }
    
    // Message attributes/metadata
    public IDictionary<string, MessageAttributeValue> Attributes { get; }
    
    // Message timestamp
    public DateTime Timestamp { get; }
    
    // Whether the message has been acknowledged
    public bool IsAcknowledged { get; }
    
    // Acknowledge the message (deletes from queue)
    public Task AcknowledgeAsync(CancellationToken cancellationToken = default);
    
    // Create a new message with different body but same acknowledgment behavior
    public IAcknowledgableMessage<TNew> WithBody<TNew>(TNew body);
}

Why this wrapper: Preserves acknowledgment context through transformations, allowing you to modify message content while maintaining the ability to delete the original message from the queue.

Complete Pipeline Example

csharp
using NPipeline.Connectors.Aws.Sqs.Configuration;
using NPipeline.Connectors.Aws.Sqs.Nodes;
using NPipeline.Pipeline;

public sealed record OrderMessage(string OrderId, string CustomerId, decimal Amount, DateTime OrderDate);

public sealed record ProcessedOrder(string OrderId, string CustomerId, decimal Amount, bool IsValid, DateTime ProcessedAt);

public sealed class SqsProcessingPipeline : IPipelineDefinition
{
    public void Define(PipelineBuilder builder, PipelineContext context)
    {
        var config = new SqsConfiguration
        {
            Region = "us-east-1",
            SourceQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/orders",
            SinkQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/processed-orders",
            MaxNumberOfMessages = 10,
            WaitTimeSeconds = 20,
            AcknowledgmentStrategy = AcknowledgmentStrategy.AutoOnSinkSuccess,
            BatchAcknowledgment = new BatchAcknowledgmentOptions
            {
                BatchSize = 10,
                FlushTimeoutMs = 1000
            }
        };

        // Add SQS source
        var source = builder.AddSource(
            new SqsSourceNode<OrderMessage>(config),
            "sqs-source");

        // Add transform to validate and process orders
        var transform = builder.AddTransform<OrderTransform, SqsMessage<OrderMessage>, ProcessedOrder>("transform");

        // Add SQS sink
        var sink = builder.AddSink(
            new SqsSinkNode<ProcessedOrder>(config),
            "sqs-sink");

        // Connect the nodes
        builder.Connect(source, transform);
        builder.Connect(transform, sink);
    }
}

public sealed class OrderTransform : ITransformNode<SqsMessage<OrderMessage>, ProcessedOrder>
{
    public Task<ProcessedOrder> TransformAsync(
        SqsMessage<OrderMessage> input,
        PipelineContext context,
        CancellationToken cancellationToken)
    {
        var order = input.Body;
        
        return Task.FromResult(new ProcessedOrder
        {
            OrderId = order.OrderId,
            CustomerId = order.CustomerId,
            Amount = order.Amount,
            IsValid = order.Amount > 0,
            ProcessedAt = DateTime.UtcNow
        });
    }
}

Configuration Reference

SqsConfiguration

PropertyTypeDefaultDescription
AccessKeyIdstring?nullAWS access key ID
SecretAccessKeystring?nullAWS secret access key
Regionstring"us-east-1"AWS region
ProfileNamestring?nullAWS profile name from ~/.aws/credentials
SourceQueueUrlstring""SQS queue URL for source
SinkQueueUrlstring""SQS queue URL for sink
MaxNumberOfMessagesint10Maximum messages per poll (1-10)
WaitTimeSecondsint20Long polling wait time (0-20)
VisibilityTimeoutint30Message visibility timeout (seconds)
PollingIntervalMsint1000Polling interval when empty (ms)
BatchSizeint10Batch size for sending (1-10)
DelaySecondsint0Message delivery delay (0-900)
MessageAttributesIDictionary<string, MessageAttributeValue>?nullMessage attributes for outgoing messages
PropertyNamingPolicyJsonPropertyNamingPolicyCamelCaseJSON property naming policy
PropertyNameCaseInsensitivebooltrueCase-insensitive property matching
MaxRetriesint3Maximum retry attempts for transient errors
RetryBaseDelayMsint1000Base delay for retry backoff (ms)
ContinueOnErrorbooltrueContinue processing on errors
MessageErrorHandlerFunc<Exception, SqsMessage<object>, bool>?nullHandler for message mapping errors
AcknowledgmentStrategyAcknowledgmentStrategyAutoOnSinkSuccessMessage acknowledgment strategy
AcknowledgmentDelayMsint5000Delay for delayed acknowledgment (ms)
BatchAcknowledgmentBatchAcknowledgmentOptions?nullBatch acknowledgment options
MaxConnectionPoolSizeint10Maximum SQS client connections to pool
MaxDegreeOfParallelismint1Maximum degree of parallelism
EnableParallelProcessingboolfalseEnable parallel message processing

AcknowledgmentStrategy

ValueDescription
AutoOnSinkSuccessAcknowledge immediately after successful sink processing (default)
ManualManual acknowledgment via AcknowledgeAsync()
DelayedAcknowledge after configurable delay
NoneNever acknowledge automatically

BatchAcknowledgmentOptions

PropertyTypeDefaultDescription
BatchSizeint10Maximum messages per batch (1-10)
FlushTimeoutMsint1000Maximum wait before flushing partial batch (ms)
EnableAutomaticBatchingbooltrueEnable automatic batch acknowledgment
MaxConcurrentBatchesint3Maximum concurrent batch operations

For more advanced configuration, refer to the AWS SDK for .NET documentation.

Released under the MIT License.