Skip to content

MySQL Connector

The NPipeline.Connectors.MySQL package provides source and sink nodes for MySQL and MariaDB. Supports connection pooling, batch inserts, high-performance LOAD DATA LOCAL INFILE, and ON DUPLICATE KEY upserts.

Installation

bash
dotnet add package NPipeline.Connectors.MySQL

Dependencies: MySqlConnector 2.x

Source Node - MySqlSourceNode<T>

Constructors

csharp
// Connection string + query
public MySqlSourceNode(
    string connectionString, string query,
    MySqlConfiguration? configuration = null)

// With custom mapper
public MySqlSourceNode(
    string connectionString, string query,
    Func<MySqlRow, T>? customMapper = null,
    MySqlConfiguration? configuration = null)

// Connection pool (recommended for DI)
public MySqlSourceNode(
    IMySqlConnectionPool connectionPool, string query,
    MySqlConfiguration? configuration = null,
    DatabaseParameter[]? parameters = null,
    bool continueOnError = false,
    string? connectionName = null)

Sink Node - MySqlSinkNode<T>

StrategyDescriptionBest For
PerRowIndividual INSERT per itemSmall volumes
Batch (default)Batched INSERT statementsMost workloads
BulkLoadLOAD DATA LOCAL INFILEMaximum throughput

Constructors

csharp
// Connection string
public MySqlSinkNode(
    string connectionString, string tableName,
    MySqlConfiguration? configuration = null,
    Func<T, IEnumerable<DatabaseParameter>>? customMapper = null)

// Connection pool (recommended for DI)
public MySqlSinkNode(
    IMySqlConnectionPool connectionPool, string tableName,
    MySqlConfiguration? configuration = null,
    Func<T, IEnumerable<DatabaseParameter>>? customMapper = null,
    string? connectionName = null)

Configuration

Connection

PropertyTypeDefaultDescription
ConnectionStringstring""MySQL connection string
CommandTimeoutint30Command timeout (seconds)
ConnectionTimeoutint15Connection timeout (seconds)
DefaultDatabasestring?nullDefault database name
CharacterSetstring"utf8mb4"Connection character set
ConvertZeroDateTimebooltrueConvert zero dates to DateTime.MinValue
MinPoolSizeint1Minimum pool size
MaxPoolSizeint100Maximum pool size

Write

PropertyTypeDefaultDescription
WriteStrategyMySqlWriteStrategyBatchPerRow, Batch, or BulkLoad
BatchSizeint100Items per batch
MaxBatchSizeint1000Maximum batch size
UseTransactionbooltrueWrap writes in a transaction
UsePreparedStatementsbooltrueUse prepared statements

Bulk Load

PropertyTypeDefaultDescription
AllowLoadLocalInfileboolfalseEnable LOAD DATA LOCAL INFILE (must also be enabled on the MySQL server)
BulkLoadBatchSizeint5000Rows per bulk load batch
BulkLoadTimeoutint300Bulk load timeout (seconds)
FieldTerminatorchar','Field separator
LineTerminatorchar'\n'Line separator

Upsert

PropertyTypeDefaultDescription
UseUpsertboolfalseEnable ON DUPLICATE KEY
UpsertKeyColumnsstring[][]Key columns for conflict detection
OnDuplicateKeyActionOnDuplicateKeyActionUpdateUpdate, Ignore, or Replace

Error Handling

PropertyTypeDefaultDescription
ContinueOnErrorboolfalseContinue on row-level errors
MaxRetryAttemptsint3Retry attempts
RetryDelayTimeSpan-Delay between retries

Dependency Injection

csharp
services.AddMySqlConnector(options =>
{
    options.DefaultConnectionString = "Server=localhost;Database=mydb;User=app;Password=secret;";
    options.DefaultConfiguration = new MySqlConfiguration
    {
        CharacterSet = "utf8mb4",
        WriteStrategy = MySqlWriteStrategy.Batch,
        BatchSize = 500
    };
});

services.AddMySqlConnection("replica", "Server=replica-db;...");

Attribute Mapping

Convention-Based

C# PascalCase maps to MySQL snake_case column names:

  • CustomerIdcustomer_id

[Column] / [IgnoreColumn] (Cross-Connector)

csharp
using NPipeline.Connectors.Attributes;

public class Customer
{
    [Column("customer_id")]
    public int CustomerId { get; set; }

    [IgnoreColumn]
    public string FullName => $"{FirstName} {LastName}";
}

[MySqlColumn] (Connector-Specific)

csharp
using NPipeline.Connectors.MySQL.Mapping;

public class Customer
{
    [MySqlColumn("customer_id", PrimaryKey = true)]
    public int CustomerId { get; set; }

    [MySqlColumn("email", DbType = MySqlDbType.VarChar, Size = 255)]
    public string Email { get; set; } = "";
}

Delivery Semantics

SemanticData LossDuplicatesOverhead
AtLeastOnce (default)NoPossibleLow
AtMostOncePossibleNoLow
ExactlyOnceNoNoHigh

Checkpointing

StrategyDescription
None (default)No checkpointing
InMemoryTransient recovery within single run
OffsetTrack position via monotonic column
KeyBasedTrack by composite keys
CursorCursor position tracking
csharp
var config = new MySqlConfiguration
{
    CheckpointStrategy = CheckpointStrategy.Offset,
    CheckpointOffsetColumn = "id",
    CheckpointStorage = new FileCheckpointStorage("checkpoints/mysql.json")
};

Performance

Write Strategy Comparison

StrategyThroughputLatencyUse Case
PerRowLowLowReal-time, per-row errors
BatchHighMediumMost workloads
BulkLoadVery HighHighBulk loads

Bulk Load

Requires AllowLoadLocalInfile = true both in the connector config and on the MySQL server (local_infile = ON).

Mapping

PropertyDefaultDescription
CaseInsensitiveMappingtrueCase-insensitive column matching
CacheMappingMetadatatrueCache mapping delegates per type
ValidateIdentifierstrueValidate identifiers to prevent injection
UsePreparedStatementstrueReduce query parsing overhead

Best Practices

  1. Use DI with AddMySqlConnector for production
  2. Use BulkLoad for maximum throughput (requires AllowLoadLocalInfile)
  3. Enable upsert with ON DUPLICATE KEY for idempotent writes
  4. Set CharacterSet = "utf8mb4" for full Unicode support
  5. Use prepared statements for repeated query patterns
  6. Configure checkpointing for long-running pipelines

Next Steps

Released under the MIT License.