Storage Providers
Prerequisites: Connectors Overview
A storage provider implements IStorageProvider - a unified interface for reading and writing files regardless of where they're stored. File-based connectors (CSV, JSON, Parquet, Excel) use storage providers so the same pipeline code works with local files, cloud storage, and SFTP.
The IStorageProvider Interface
public interface IStorageProvider
{
StorageScheme Scheme { get; }
bool CanHandle(StorageUri uri);
Task<Stream> OpenReadAsync(StorageUri uri, CancellationToken ct = default);
Task<Stream> OpenWriteAsync(StorageUri uri, CancellationToken ct = default);
Task<bool> ExistsAsync(StorageUri uri, CancellationToken ct = default);
IAsyncEnumerable<StorageItem> ListAsync(StorageUri prefix, bool recursive = false, CancellationToken ct = default);
}Extended interfaces add optional capabilities: IDeletableStorageProvider (delete files), IMoveableStorageProvider (move/rename), IConfigurableStorageProvider (runtime configuration).
Choosing a Provider
| Provider | System | Package |
|---|---|---|
| Built-in | Local file system | NPipeline.StorageProviders |
| AWS S3 | Amazon S3 | NPipeline.StorageProviders.S3.Aws |
| Azure Blob | Azure Blob Storage | NPipeline.StorageProviders.Azure |
| ADLS Gen2 | Azure Data Lake Storage Gen2 | NPipeline.StorageProviders.Adls |
| Google Cloud Storage | Google Cloud Storage | NPipeline.StorageProviders.Gcp |
| SFTP | SFTP servers | NPipeline.StorageProviders.Sftp |
| S3-Compatible | MinIO, DigitalOcean Spaces, Cloudflare R2 | NPipeline.StorageProviders.S3.Compatible |
Usage with Connectors
Pass a storage provider to any file-based connector:
var storage = new AwsS3StorageProvider(new AwsS3StorageProviderOptions
{
DefaultRegion = RegionEndpoint.USEast1
});
var config = new CsvConfiguration { HasHeaderRecord = true };
var uri = new StorageUri("s3://my-bucket/data/orders.csv");
var source = new CsvSourceNode<Order>(config, storage, uri);Switch storage without changing pipeline logic:
// Local development
var storage = new FileSystemStorageProvider();
var uri = new StorageUri("file:///data/orders.csv");
// Production (same connector, different storage)
var storage = new AwsS3StorageProvider(s3Options);
var uri = new StorageUri("s3://prod-bucket/data/orders.csv");DI Registration
Each provider package includes IServiceCollection extensions:
services.AddNPipelineS3Storage(options => { /* configure */ });
services.AddNPipelineAzureBlobStorage(options => { /* configure */ });
services.AddNPipelineGcsStorage(options => { /* configure */ });Storage Resolver
StorageResolver automatically selects the right provider based on URI scheme:
var resolver = new StorageResolver(new IStorageProvider[]
{
new FileSystemStorageProvider(),
new AwsS3StorageProvider(s3Options),
new AzureBlobStorageProvider(blobOptions)
});
// Resolver picks the right provider based on URI scheme
var stream = await resolver.OpenReadAsync(new StorageUri("s3://bucket/file.csv"));Custom Provider
See Implementing a Custom Provider if you need to support a storage system that isn't covered.
StorageUri
StorageUri is the address type used by all storage providers. It supports local files, cloud storage, and SFTP - with optional query parameters for per-request configuration:
// Local file
var local = StorageUri.FromFilePath("/data/orders.csv");
// S3
var s3 = StorageUri.Parse("s3://my-bucket/data/orders.csv?region=us-west-2");
// Azure Blob
var azure = StorageUri.Parse("azure://my-container/data/orders.csv");
// ADLS Gen2
var adls = StorageUri.Parse("adls://my-filesystem/data/orders.parquet");
// GCS
var gcs = StorageUri.Parse("gs://my-bucket/data/orders.csv");
// SFTP
var sftp = StorageUri.Parse("sftp://server.example.com/data/orders.csv");Properties: Scheme, Host, Path, Port, UserInfo, Parameters.
Built-in FileSystem Provider
The FileSystemStorageProvider handles file:// URIs and local paths. It's included in NPipeline.StorageProviders with no additional dependencies:
var provider = new FileSystemStorageProvider();
var uri = StorageUri.FromFilePath("data/orders.csv");
using var stream = await provider.OpenReadAsync(uri);Common Operations
All providers expose the same operations:
// Read
using var readStream = await provider.OpenReadAsync(uri);
// Write
using var writeStream = await provider.OpenWriteAsync(uri);
// Exists
bool exists = await provider.ExistsAsync(uri);
// List (recursive or non-recursive)
await foreach (var item in provider.ListAsync(prefix, recursive: true))
{
Console.WriteLine($"{item.Uri} - {item.Size} bytes - {item.LastModified}");
}
// Metadata (via IStorageProviderMetadataProvider)
var metadata = await provider.GetMetadataAsync(uri);Next Steps
- Pick a provider from the table above to see configuration details
- Custom Provider - implement your own storage provider
- Connectors - use storage providers with file-based connectors
