Skip to content

Parquet Connector

The NPipeline.Connectors.Parquet package reads and writes Apache Parquet files using Parquet.Net. Optimized for large analytical datasets with column projection pushdown, multi-file parallel reads, configurable compression, atomic writes, and schema compatibility modes.

Installation

bash
dotnet add package NPipeline.Connectors.Parquet

Dependencies: Parquet.Net 6.x, NPipeline.Connectors, NPipeline.StorageProviders

Storage Abstraction

The Parquet connector uses NPipeline's storage abstraction layer. See the CSV Connector - Storage Abstraction section for full details on StorageUri, IStorageResolver, and when you need an explicit resolver.

csharp
// Local file (no resolver needed)
var source = new ParquetSourceNode<Order>(StorageUri.FromFilePath("orders.parquet"));

// Cloud storage (explicit resolver)
var source = new ParquetSourceNode<Order>(
    StorageUri.Parse("s3://bucket/orders.parquet"),
    resolver: myResolver);

Column Mapping

Attribute-Based Mapping

Use [ParquetColumn] to map properties to Parquet columns:

csharp
using NPipeline.Connectors.Parquet;

public class Order
{
    [ParquetColumn("order_id")]
    public int Id { get; set; }

    public string CustomerName { get; set; } = string.Empty;

    [ParquetDecimal(18, 2)]  // Required for decimal properties
    public decimal Amount { get; set; }

    [ParquetColumn(Ignore = true)]
    public string InternalNote { get; set; } = string.Empty;
}

⚠️ Important: Decimal properties must have a [ParquetDecimal(precision, scale)] attribute. Parquet requires explicit precision and scale for decimal types.

The generic [Column] and [IgnoreColumn] attributes from NPipeline.Connectors.Attributes are also supported.

Supported Type Mappings

.NET TypeParquet Type
stringUTF8 string
int, short, byteINT32
longINT64
floatFLOAT
doubleDOUBLE
boolBOOLEAN
decimalDECIMAL (requires [ParquetDecimal])
DateTime, DateTimeOffsetTIMESTAMP
DateOnlyDATE
GuidFIXED_LEN_BYTE_ARRAY
byte[]BYTE_ARRAY
enumINT32
List<T>, T[]Repeated group

Lambda-Based Mapping

csharp
var source = new ParquetSourceNode<Order>(
    StorageUri.FromFilePath("orders.parquet"),
    row => new Order
    {
        Id = row.Get<int>("order_id"),
        CustomerName = row.Get<string>("customer_name") ?? string.Empty,
        Amount = row.Get<decimal>("amount")
    });

Source Node - ParquetSourceNode<T>

Reads Parquet files and emits each row as an item of type T. Streams row groups one at a time to limit memory usage.

Constructors

csharp
// Attribute-based mapping with optional resolver
public ParquetSourceNode(
    StorageUri uri,
    IStorageResolver? resolver = null,
    ParquetConfiguration? configuration = null)

// Lambda-based mapping with optional resolver
public ParquetSourceNode(
    StorageUri uri,
    Func<ParquetRow, T> rowMapper,
    IStorageResolver? resolver = null,
    ParquetConfiguration? configuration = null)

// Attribute-based mapping with explicit provider
public ParquetSourceNode(
    IStorageProvider provider,
    StorageUri uri,
    ParquetConfiguration? configuration = null)

// Lambda-based mapping with explicit provider
public ParquetSourceNode(
    IStorageProvider provider,
    StorageUri uri,
    Func<ParquetRow, T> rowMapper,
    ParquetConfiguration? configuration = null)

Example: Column Projection

Read only the columns you need to reduce I/O:

csharp
var config = new ParquetConfiguration
{
    ProjectedColumns = new[] { "OrderId", "Amount", "Date" },
    FileReadParallelism = 4
};

var source = new ParquetSourceNode<OrderSummary>(
    StorageUri.FromFilePath("orders.parquet"),
    configuration: config);

Sink Node - ParquetSinkNode<T>

Writes items to a Parquet file with configurable row groups, compression, and atomic writes.

Constructors

csharp
// Attribute-based mapping with optional resolver
public ParquetSinkNode(
    StorageUri uri,
    IStorageResolver? resolver = null,
    ParquetConfiguration? configuration = null)

// Attribute-based mapping with explicit provider
public ParquetSinkNode(
    IStorageProvider provider,
    StorageUri uri,
    ParquetConfiguration? configuration = null)

Example: Compressed Output

csharp
var config = new ParquetConfiguration
{
    RowGroupSize = 100_000,
    Compression = CompressionMethod.Snappy,
    UseAtomicWrite = true
};

var sink = new ParquetSinkNode<Order>(
    StorageUri.FromFilePath("output.parquet"),
    configuration: config);

Configuration

Write Options

PropertyTypeDefaultDescription
RowGroupSizeint50,000Rows buffered before flushing a row group
CompressionCompressionMethodSnappyCodec: Snappy, Gzip, or None
TargetFileSizeByteslong?256 MBRotate files at this size; null disables
UseAtomicWritebooltrueWrite to temp file then rename on success
MaxBufferedRowsint250,000Max rows across all partition buffers before flush

Read Options

PropertyTypeDefaultDescription
ProjectedColumnsIReadOnlyList<string>?nullColumn whitelist - only read these columns
SchemaCompatibilitySchemaCompatibilityModeStrictSchema validation mode (see below)
RecursiveDiscoveryboolfalseScan subdirectories for Parquet files
FileReadParallelismint1Number of files to read in parallel
RowFilterFunc<ParquetRow, bool>?nullPredicate to filter rows during read
SchemaValidatorFunc<ParquetSchema, bool>?nullValidate schema before reading

Error Handling

PropertyTypeDefaultDescription
RowErrorHandlerFunc<Exception, ParquetRow, bool>?nullPer-row error handler. Return true to skip, false to throw.
ObserverIParquetConnectorObserver?nullLifecycle events listener for metrics and diagnostics

Schema Compatibility Modes

ModeBehavior
StrictFile schema must exactly match the target type. Extra or missing columns cause an error.
AdditiveFile may have extra columns (ignored) or missing columns (use defaults).
NameOnlyMatch by column name only, ignoring type differences where safe conversion exists.

Performance Tips

  • Column projection (ProjectedColumns): reduces I/O significantly for wide tables - only the requested columns are read from disk.
  • Parallel reads (FileReadParallelism): set > 1 when reading multiple files or when storage supports concurrent access.
  • Row group sizing: larger groups improve compression ratio but increase memory. 50K–100K rows is a good starting point.
  • Compression: Snappy (default) balances speed and ratio. Use Gzip for better compression at the cost of CPU.

Example: Full Pipeline (CSV → Parquet)

csharp
public sealed class CsvToParquetPipeline : IPipelineDefinition
{
    public void Define(PipelineBuilder builder, PipelineContext context)
    {
        var source = builder.AddSource(
            new CsvSourceNode<Order>(StorageUri.FromFilePath("orders.csv")),
            "csv-source");

        var config = new ParquetConfiguration
        {
            Compression = CompressionMethod.Snappy,
            UseAtomicWrite = true
        };
        var sink = builder.AddSink(
            new ParquetSinkNode<Order>(
                StorageUri.FromFilePath("orders.parquet"),
                configuration: config),
            "parquet-sink");

        builder.Connect(source, sink);
    }
}

Next Steps

Storage Abstraction

All file connectors use StorageUri + IStorageResolver:

csharp
// Local file
var source = new ParquetSourceNode<Order>(StorageUri.FromFilePath("orders.parquet"));

// Cloud storage
var source = new ParquetSourceNode<Order>(
    StorageUri.Parse("s3://my-bucket/data/orders.parquet"),
    resolver: myStorageResolver);

Compression Codecs

CodecRatioSpeedBest For
Snappy (default)GoodFastMost workloads
GzipBetterSlowerStorage-optimized, archival
None-FastestAlready-compressed data, debugging

Supported .NET Types

.NET TypeParquet Type
boolBOOLEAN
int, longINT32, INT64
float, doubleFLOAT, DOUBLE
decimalFIXED_LEN_BYTE_ARRAY (Decimal)
stringBYTE_ARRAY (UTF8)
DateTime, DateTimeOffsetINT96 or INT64 (timestamp)
byte[]BYTE_ARRAY
GuidFIXED_LEN_BYTE_ARRAY
Nullable variantsSame with optional repetition

Atomic Writes

csharp
var config = new ParquetConfiguration { UseAtomicWrite = true };

With atomic writes enabled, data is written to a temporary file and renamed on success. This prevents partial files on failure - critical for data lake scenarios.

Best Practices

  1. Use column projection - only read what you need for wide tables
  2. Use Snappy compression (default) - best speed/ratio tradeoff
  3. Enable atomic writes for production - prevents partial files
  4. Set RowGroupSize = 50,000–100,000 - balances compression and memory
  5. Use RecursiveDiscovery when reading partitioned directories
  6. Use FileReadParallelism > 1 for multi-file reads on fast storage

Released under the MIT License.