Add SingleStoreBulkUpdate for bulk UPDATE operations#20
Conversation
PR SummaryMedium Risk Overview The pipeline validates mappings (keys in mappings, no expression mappings, at least one non-key column), rejects reference tables and shard-key updates, builds the staging table from verbatim Logging adds a dedicated Reviewed by Cursor Bugbot for commit 74eaf40. Configure here. |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 2 potential issues.
Bugbot Autofix prepared fixes for both issues found in the latest run.
- ✅ Fixed: Empty row sources behave inconsistently
- Empty DataRow sequences are now materialized once before staging and short-circuit with zero counts consistently.
- ✅ Fixed: Staging shard key order invalid
- The staging shard key is only reused when it matches the leading primary-key columns in order, otherwise it is omitted safely.
Or push these changes by commenting:
@cursor push 448fadace9
Preview (448fadace9)
diff --git a/src/SingleStoreConnector/SingleStoreBulkUpdate.cs b/src/SingleStoreConnector/SingleStoreBulkUpdate.cs
--- a/src/SingleStoreConnector/SingleStoreBulkUpdate.cs
+++ b/src/SingleStoreConnector/SingleStoreBulkUpdate.cs
@@ -28,66 +28,66 @@
/// </summary>
/// <param name="connection">The <see cref="SingleStoreConnection"/> to use.</param>
/// <param name="transaction">(Optional) The <see cref="SingleStoreTransaction"/> to use.</param>
- public SingleStoreBulkUpdate(SingleStoreConnection connection, SingleStoreTransaction? transaction = null)
- {
- m_connection = connection ?? throw new ArgumentNullException(nameof(connection));
- m_transaction = transaction;
- m_logger = m_connection.LoggingConfiguration.BulkUpdateLogger;
- m_warnings = [];
- ColumnMappings = [];
- KeyColumns = [];
- }
+ public SingleStoreBulkUpdate(SingleStoreConnection connection, SingleStoreTransaction? transaction = null)
+ {
+ m_connection = connection ?? throw new ArgumentNullException(nameof(connection));
+ m_transaction = transaction;
+ m_logger = m_connection.LoggingConfiguration.BulkUpdateLogger;
+ m_warnings = [];
+ ColumnMappings = [];
+ KeyColumns = [];
+ }
- /// <summary>
- /// Gets or sets the name of the destination table.
- /// </summary>
- public string? DestinationTableName { get; set; }
+ /// <summary>
+ /// Gets or sets the name of the destination table.
+ /// </summary>
+ public string? DestinationTableName { get; set; }
- /// <summary>
- /// Gets the list of key columns used for the JOIN condition.
- /// These columns identify which rows to update.
- /// </summary>
- public List<string> KeyColumns { get; }
+ /// <summary>
+ /// Gets the list of key columns used for the JOIN condition.
+ /// These columns identify which rows to update.
+ /// </summary>
+ public List<string> KeyColumns { get; }
- /// <summary>
- /// Gets the collection of column mappings between source data and destination table.
- /// </summary>
- public List<SingleStoreBulkCopyColumnMapping> ColumnMappings { get; }
+ /// <summary>
+ /// Gets the collection of column mappings between source data and destination table.
+ /// </summary>
+ public List<SingleStoreBulkCopyColumnMapping> ColumnMappings { get; }
- /// <summary>
- /// Gets or sets the timeout in seconds for bulk operations.
- /// </summary>
- public int BulkCopyTimeout { get; set; } = 30;
+ /// <summary>
+ /// Gets or sets the timeout in seconds for bulk operations.
+ /// </summary>
+ public int BulkCopyTimeout { get; set; } = 30;
- /// <summary>
- /// Gets or sets the number of rows to stage before firing the SingleStoreRowsStaged event.
- /// Only applies to the staging phase (LOAD DATA), not the UPDATE execution.
- /// Set to 0 to disable progress notifications.
- /// </summary>
- public int NotifyAfter { get; set; }
+ /// <summary>
+ /// Gets or sets the number of rows to stage before firing the SingleStoreRowsStaged event.
+ /// Only applies to the staging phase (LOAD DATA), not the UPDATE execution.
+ /// Set to 0 to disable progress notifications.
+ /// </summary>
+ public int NotifyAfter { get; set; }
- /// <summary>
- /// Gets or sets whether to compute the RowsMatched count via a COUNT query.
- /// Default is true. Set to false to skip the COUNT query for better performance.
- /// When false, RowsMatched will be null in the result.
- /// </summary>
- public bool ComputeRowsMatched { get; set; } = true;
+ /// <summary>
+ /// Gets or sets whether to compute the RowsMatched count via a COUNT query.
+ /// Default is true. Set to false to skip the COUNT query for better performance.
+ /// When false, RowsMatched will be null in the result.
+ /// </summary>
+ public bool ComputeRowsMatched { get; set; } = true;
- /// <summary>
- /// This event is raised every time that the number of rows specified by the <see cref="NotifyAfter"/> property have been processed.
- /// </summary>
- /// <remarks>
- /// <para>Receipt of a RowsStaged event does not imply that any rows have been sent to the server or committed.</para>
- /// <para>The <see cref="SingleStoreRowsStagedEventArgs.Abort"/> property can be set to <c>true</c> by the event handler to abort the staging.</para>
- /// </remarks>
- public event SingleStoreRowsStagedEventHandler? SingleStoreRowsStaged;
+ /// <summary>
+ /// This event is raised every time that the number of rows specified by the <see cref="NotifyAfter"/> property have been processed.
+ /// </summary>
+ /// <remarks>
+ /// <para>Receipt of a RowsStaged event does not imply that any rows have been sent to the server or committed.</para>
+ /// <para>The <see cref="SingleStoreRowsStagedEventArgs.Abort"/> property can be set to <c>true</c> by the event handler to abort the staging.</para>
+ /// </remarks>
+ public event SingleStoreRowsStagedEventHandler? SingleStoreRowsStaged;
/// <summary>
/// Updates rows in the destination table using the data in the supplied <see cref="DataTable"/>.
/// </summary>
/// <param name="dataTable">The <see cref="DataTable"/> containing the key and update column values.</param>
/// <returns>A <see cref="SingleStoreBulkUpdateResult"/> describing the result of the operation.</returns>
- public SingleStoreBulkUpdateResult WriteToServer(DataTable dataTable)
+ public SingleStoreBulkUpdateResult WriteToServer(DataTable dataTable)
{
ArgumentNullException.ThrowIfNull(dataTable);
#pragma warning disable CA2012 // Safe because method completes synchronously
@@ -101,7 +101,7 @@
/// <param name="dataTable">The <see cref="DataTable"/> containing the key and update column values.</param>
/// <param name="cancellationToken">A token to cancel the asynchronous operation.</param>
/// <returns>A <see cref="SingleStoreBulkUpdateResult"/> describing the result of the operation.</returns>
- public async ValueTask<SingleStoreBulkUpdateResult> WriteToServerAsync(DataTable dataTable, CancellationToken cancellationToken = default)
+ public async ValueTask<SingleStoreBulkUpdateResult> WriteToServerAsync(DataTable dataTable, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(dataTable);
return await WriteToServerAsync(IOBehavior.Asynchronous, dataTable, cancellationToken).ConfigureAwait(false);
@@ -112,7 +112,7 @@
/// </summary>
/// <param name="dataRows">The collection of <see cref="DataRow"/> objects containing the key and update column values.</param>
/// <returns>A <see cref="SingleStoreBulkUpdateResult"/> describing the result of the operation.</returns>
- public SingleStoreBulkUpdateResult WriteToServer(IEnumerable<DataRow> dataRows)
+ public SingleStoreBulkUpdateResult WriteToServer(IEnumerable<DataRow> dataRows)
{
ArgumentNullException.ThrowIfNull(dataRows);
#pragma warning disable CA2012 // Safe because method completes synchronously
@@ -126,7 +126,7 @@
/// <param name="dataRows">The collection of <see cref="DataRow"/> objects containing the key and update column values.</param>
/// <param name="cancellationToken">A token to cancel the asynchronous operation.</param>
/// <returns>A <see cref="SingleStoreBulkUpdateResult"/> describing the result of the operation.</returns>
- public async ValueTask<SingleStoreBulkUpdateResult> WriteToServerAsync(IEnumerable<DataRow> dataRows, CancellationToken cancellationToken = default)
+ public async ValueTask<SingleStoreBulkUpdateResult> WriteToServerAsync(IEnumerable<DataRow> dataRows, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(dataRows);
return await WriteToServerAsync(IOBehavior.Asynchronous, dataRows, cancellationToken).ConfigureAwait(false);
@@ -137,7 +137,7 @@
/// </summary>
/// <param name="dataReader">The <see cref="IDataReader"/> to read the key and update column values from.</param>
/// <returns>A <see cref="SingleStoreBulkUpdateResult"/> describing the result of the operation.</returns>
- public SingleStoreBulkUpdateResult WriteToServer(IDataReader dataReader)
+ public SingleStoreBulkUpdateResult WriteToServer(IDataReader dataReader)
{
ArgumentNullException.ThrowIfNull(dataReader);
#pragma warning disable CA2012 // Safe because method completes synchronously
@@ -151,7 +151,7 @@
/// <param name="dataReader">The <see cref="IDataReader"/> to read the key and update column values from.</param>
/// <param name="cancellationToken">A token to cancel the asynchronous operation.</param>
/// <returns>A <see cref="SingleStoreBulkUpdateResult"/> describing the result of the operation.</returns>
- public async ValueTask<SingleStoreBulkUpdateResult> WriteToServerAsync(IDataReader dataReader, CancellationToken cancellationToken = default)
+ public async ValueTask<SingleStoreBulkUpdateResult> WriteToServerAsync(IDataReader dataReader, CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(dataReader);
return await WriteToServerAsync(IOBehavior.Asynchronous, dataReader, cancellationToken).ConfigureAwait(false);
@@ -168,7 +168,7 @@
/// </param>
/// <param name="source">The source data: a <see cref="DataTable"/>, a sequence of <see cref="DataRow"/>, or an <see cref="IDataReader"/>.</param>
/// <param name="cancellationToken">A token to cancel the asynchronous operation.</param>
- private async ValueTask<SingleStoreBulkUpdateResult> WriteToServerAsync(IOBehavior ioBehavior, object source, CancellationToken cancellationToken)
+ private async ValueTask<SingleStoreBulkUpdateResult> WriteToServerAsync(IOBehavior ioBehavior, object source, CancellationToken cancellationToken)
{
// Validate configuration before touching the connection so misconfiguration fails fast and cheaply.
ValidateColumnMappings();
@@ -179,10 +179,12 @@
// Reset any warnings from a previous call so the result only reflects this operation.
m_warnings.Clear();
- // Short-circuit input whose row count is known to be zero: there is nothing to stage or update, so avoid
- // opening the connection and creating a staging table. (An IDataReader's count is unknown, so it still
- // flows through and stages zero rows naturally.)
- if (GetRowCount(source) == 0)
+ source = NormalizeSource(source, out var rowCount);
+
+ // Short-circuit empty input: there is nothing to stage or update, so avoid opening the connection and
+ // creating a staging table. (An IDataReader's count is unknown, so it still flows through and stages zero
+ // rows naturally.)
+ if (rowCount == 0)
return new SingleStoreBulkUpdateResult(m_warnings.AsReadOnly(), rowsStaged: 0, rowsMatched: ComputeRowsMatched ? 0 : -1, rowsUpdated: 0);
var stopwatch = Stopwatch.StartNew();
@@ -203,7 +205,7 @@
await ValidateSchemaAsync(destinationTableName, ioBehavior, cancellationToken).ConfigureAwait(false);
var updateColumns = GetUpdateColumns();
- Log.StartingBulkUpdate(m_logger, destinationTableName, string.Join(", ", KeyColumns), string.Join(", ", updateColumns), GetRowCount(source));
+ Log.StartingBulkUpdate(m_logger, destinationTableName, string.Join(", ", KeyColumns), string.Join(", ", updateColumns), rowCount);
// Phase 1: create the staging table mirroring the destination column types.
tempTableName = await CreateStagingTableAsync(destinationTableName, ioBehavior, cancellationToken).ConfigureAwait(false);
@@ -240,17 +242,35 @@
/// Returns the number of rows in the source for logging, or <c>-1</c> when the count is not known in advance
/// (for example an <see cref="IDataReader"/>, which is consumed as it is staged).
/// </summary>
- private static int GetRowCount(object source) =>
+ private static int GetRowCount(object source) =>
source switch
{
DataTable dataTable => dataTable.Rows.Count,
ICollection<DataRow> dataRows => dataRows.Count,
IReadOnlyCollection<DataRow> dataRows => dataRows.Count,
+ System.Collections.ICollection dataRows => dataRows.Count,
_ => -1,
};
- private void ValidateColumnMappings()
+ /// <summary>
+ /// Materializes row sequences whose count is unknown so empty sequences can be handled consistently with
+ /// empty <see cref="DataTable"/> sources without consuming lazy input more than once.
+ /// </summary>
+ private static object NormalizeSource(object source, out int rowCount)
{
+ rowCount = GetRowCount(source);
+ if (rowCount == -1 && source is IEnumerable<DataRow> dataRows)
+ {
+ var rows = dataRows as IReadOnlyList<DataRow> ?? dataRows.ToList();
+ rowCount = rows.Count;
+ return rows;
+ }
+
+ return source;
+ }
+
+ private void ValidateColumnMappings()
+ {
// Ensure the caller specified at least one key column.
// Key columns define the JOIN condition between the destination table and the staging table.
if (KeyColumns.Count == 0)
@@ -314,7 +334,7 @@
throw new InvalidOperationException("ColumnMappings must contain at least one non-key column to update.");
}
- private async ValueTask ValidateSchemaAsync(string tableName, IOBehavior ioBehavior, CancellationToken cancellationToken)
+ private async ValueTask ValidateSchemaAsync(string tableName, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
var schemaDetector = new SchemaDetector(m_connection, m_transaction);
@@ -349,7 +369,7 @@
}
}
- private List<string> GetUpdateColumns() =>
+ private List<string> GetUpdateColumns() =>
ColumnMappings
.Select(x => x.DestinationColumn)
.Where(x => !KeyColumns.Contains(x, StringComparer.OrdinalIgnoreCase))
@@ -376,15 +396,15 @@
/// The key columns form the staging table's <c>PRIMARY KEY</c>, so they are always declared <c>NOT NULL</c>
/// even when the destination column is nullable (a nullable primary key column is not allowed, and SQL
/// equality on <c>NULL</c> would not match rows in the join anyway). When the destination table's shard key
- /// is a subset of the key columns, the staging table is sharded the same way so the join can run locally;
- /// otherwise it falls back to the primary key distribution and logs a shard-key mismatch warning.
+ /// matches the leading key columns in order, the staging table is sharded the same way so the join can run
+ /// locally; otherwise it falls back to the primary key distribution and logs a shard-key mismatch warning.
/// </para>
/// <para>
/// This must run on the same open connection (and transaction) used for staging, counting and updating,
/// because the temporary table is session-scoped.
/// </para>
/// </remarks>
- private async Task<string> CreateStagingTableAsync(string destinationTableName, IOBehavior ioBehavior, CancellationToken cancellationToken)
+ private async Task<string> CreateStagingTableAsync(string destinationTableName, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
// Generate a unique temporary table name. The "g" suffix on the GUID guarantees the identifier
// starts with a letter regardless of the GUID's first hex digit.
@@ -428,7 +448,7 @@
// table. This also rejects duplicate keys in the source data with a clear primary-key violation.
var primaryKey = $"PRIMARY KEY ({string.Join(", ", KeyColumns.Select(IdentifierHelper.QuoteIdentifier))})";
- var stagingShardKey = ComputeStagingShardKey(shardKeyColumns, keyColumnSet);
+ var stagingShardKey = ComputeStagingShardKey(shardKeyColumns);
var createTableSql = new StringBuilder();
createTableSql.Append("CREATE TEMPORARY TABLE ");
@@ -465,18 +485,19 @@
/// local (non-reshuffled) join whenever possible.
/// </summary>
/// <remarks>
- /// A shard key must be a subset of the primary key, which for the staging table is exactly the key columns.
- /// When the destination's shard key is contained in the key columns we reuse it verbatim (preserving its
- /// column order) so both tables hash to the same partitions. When it is not — for example the destination is
- /// sharded on a column that is not a join key — the staging table cannot be aligned, so we fall back to the
- /// primary-key distribution (by returning an empty list, which omits an explicit shard key) and warn.
+ /// A shard key must be a leading prefix of the primary key in the same order. When the destination's shard key
+ /// matches that prefix, we reuse it verbatim so both tables hash to the same partitions. When it does not —
+ /// for example the destination is sharded on a column that is not a join key, or the caller provided the key
+ /// columns in a different order — the staging table cannot be aligned safely, so we fall back to the primary-key
+ /// distribution (by returning an empty list, which omits an explicit shard key) and warn.
/// </remarks>
- private List<string> ComputeStagingShardKey(List<string> destinationShardKeyColumns, HashSet<string> keyColumnSet)
+ private List<string> ComputeStagingShardKey(List<string> destinationShardKeyColumns)
{
if (destinationShardKeyColumns.Count == 0)
return [];
- if (destinationShardKeyColumns.All(keyColumnSet.Contains))
+ if (destinationShardKeyColumns.Count <= KeyColumns.Count &&
+ destinationShardKeyColumns.SequenceEqual(KeyColumns.Take(destinationShardKeyColumns.Count), StringComparer.OrdinalIgnoreCase))
return destinationShardKeyColumns;
Log.ShardKeyMismatchForBulkUpdate(
@@ -509,7 +530,7 @@
/// destination-name relationship identical between staging and the later <c>UPDATE ... JOIN</c>.
/// </para>
/// </remarks>
- private async Task<int> StageDataAsync(string tempTableName, object source, IOBehavior ioBehavior, CancellationToken cancellationToken)
+ private async Task<int> StageDataAsync(string tempTableName, object source, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
var bulkCopy = new SingleStoreBulkCopy(m_connection, m_transaction)
{
@@ -568,31 +589,31 @@
/// copy. Empty input is short-circuited before staging, so the sequence is expected to be non-empty here; it
/// is still guarded so an unexpected empty sequence fails clearly rather than dereferencing a missing row.
/// </remarks>
- private static ValueTask<SingleStoreBulkCopyResult> StageWithBulkCopyAsync(SingleStoreBulkCopy bulkCopy, object source, IOBehavior ioBehavior, CancellationToken cancellationToken)
+ private static ValueTask<SingleStoreBulkCopyResult> StageWithBulkCopyAsync(SingleStoreBulkCopy bulkCopy, object source, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
switch (source)
{
- case DataTable dataTable:
- return ioBehavior == IOBehavior.Synchronous
- ? new ValueTask<SingleStoreBulkCopyResult>(bulkCopy.WriteToServer(dataTable))
- : bulkCopy.WriteToServerAsync(dataTable, cancellationToken);
+ case DataTable dataTable:
+ return ioBehavior == IOBehavior.Synchronous
+ ? new ValueTask<SingleStoreBulkCopyResult>(bulkCopy.WriteToServer(dataTable))
+ : bulkCopy.WriteToServerAsync(dataTable, cancellationToken);
- case IEnumerable<DataRow> dataRows:
- var rows = dataRows as IReadOnlyList<DataRow> ?? dataRows.ToList();
- if (rows.Count == 0)
- throw new ArgumentException("Cannot stage an empty sequence of rows.", nameof(source));
- var columnCount = rows[0].Table.Columns.Count;
- return ioBehavior == IOBehavior.Synchronous
- ? new ValueTask<SingleStoreBulkCopyResult>(bulkCopy.WriteToServer(rows, columnCount))
- : bulkCopy.WriteToServerAsync(rows, columnCount, cancellationToken);
+ case IEnumerable<DataRow> dataRows:
+ var rows = dataRows as IReadOnlyList<DataRow> ?? dataRows.ToList();
+ if (rows.Count == 0)
+ throw new ArgumentException("Cannot stage an empty sequence of rows.", nameof(source));
+ var columnCount = rows[0].Table.Columns.Count;
+ return ioBehavior == IOBehavior.Synchronous
+ ? new ValueTask<SingleStoreBulkCopyResult>(bulkCopy.WriteToServer(rows, columnCount))
+ : bulkCopy.WriteToServerAsync(rows, columnCount, cancellationToken);
- case IDataReader dataReader:
- return ioBehavior == IOBehavior.Synchronous
- ? new ValueTask<SingleStoreBulkCopyResult>(bulkCopy.WriteToServer(dataReader))
- : bulkCopy.WriteToServerAsync(dataReader, cancellationToken);
+ case IDataReader dataReader:
+ return ioBehavior == IOBehavior.Synchronous
+ ? new ValueTask<SingleStoreBulkCopyResult>(bulkCopy.WriteToServer(dataReader))
+ : bulkCopy.WriteToServerAsync(dataReader, cancellationToken);
- default:
- throw new ArgumentException($"Unsupported source type '{source.GetType()}'.", nameof(source));
+ default:
+ throw new ArgumentException($"Unsupported source type '{source.GetType()}'.", nameof(source));
}
}
@@ -619,7 +640,7 @@
/// operation because the staging table is session-scoped.
/// </para>
/// </remarks>
- private async Task<int?> ComputeMatchedRowsAsync(string tempTableName, IOBehavior ioBehavior, CancellationToken cancellationToken)
+ private async Task<int?> ComputeMatchedRowsAsync(string tempTableName, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
if (!ComputeRowsMatched)
return null;
@@ -670,7 +691,7 @@
/// operation result.
/// </para>
/// </remarks>
- private async Task<int> ExecuteUpdateAsync(string tempTableName, IOBehavior ioBehavior, CancellationToken cancellationToken)
+ private async Task<int> ExecuteUpdateAsync(string tempTableName, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
// Assign each non-key mapped column from the staging row: t.`c1` = s.`c1`, t.`c2` = s.`c2` ...
var setClause = string.Join(
@@ -723,7 +744,7 @@
/// is used deliberately so cleanup still runs after a cancelled or timed-out operation.
/// </para>
/// </remarks>
- private async Task DropStagingTableAsync(string? tempTableName, IOBehavior ioBehavior)
+ private async Task DropStagingTableAsync(string? tempTableName, IOBehavior ioBehavior)
{
if (string.IsNullOrEmpty(tempTableName))
return;
@@ -752,13 +773,13 @@
/// Builds the key-column equi-join predicate shared by the match-count query and the update, joining the
/// destination table (alias <c>t</c>) to the staging table (alias <c>s</c>) on every key column.
/// </summary>
- private string BuildKeyJoinCondition() =>
+ private string BuildKeyJoinCondition() =>
string.Join(
" AND ",
KeyColumns.Select(k => $"t.{IdentifierHelper.QuoteIdentifier(k)} = s.{IdentifierHelper.QuoteIdentifier(k)}"));
- private readonly SingleStoreConnection m_connection;
- private readonly SingleStoreTransaction? m_transaction;
- private readonly ILogger m_logger;
- private readonly List<SingleStoreError> m_warnings;
+ private readonly SingleStoreConnection m_connection;
+ private readonly SingleStoreTransaction? m_transaction;
+ private readonly ILogger m_logger;
+ private readonly List<SingleStoreError> m_warnings;
}
diff --git a/tests/SideBySide/BulkUpdateTests.cs b/tests/SideBySide/BulkUpdateTests.cs
--- a/tests/SideBySide/BulkUpdateTests.cs
+++ b/tests/SideBySide/BulkUpdateTests.cs
@@ -115,6 +115,56 @@
[Theory]
[InlineData(false)]
[InlineData(true)]
+ public async Task UpdatesWithReorderedCompositeKeyColumns(bool isAsync)
+ {
+ using var connection = new SingleStoreConnection(GetLocalConnectionString(database));
+ await connection.OpenAsync();
+ using (var cmd = new SingleStoreCommand(@"drop table if exists bulk_update_reordered_composite;
+create table bulk_update_reordered_composite(tenant_id int, user_id int, email varchar(100), shard key (tenant_id, user_id), primary key (tenant_id, user_id));
+insert into bulk_update_reordered_composite values (1, 100, 'user100@tenant1.com');", connection))
+ {
+ await cmd.ExecuteNonQueryAsync();
+ }
+
+ var dataTable = new DataTable
+ {
+ Columns =
+ {
+ new DataColumn("user_id", typeof(int)),
+ new DataColumn("tenant_id", typeof(int)),
+ new DataColumn("email", typeof(string)),
+ },
+ Rows =
+ {
+ new object[] { 100, 1, "new100@tenant1.com" },
+ },
+ };
+
+ var bulkUpdate = new SingleStoreBulkUpdate(connection)
+ {
+ DestinationTableName = "bulk_update_reordered_composite",
+ KeyColumns = { "user_id", "tenant_id" },
+ ColumnMappings =
+ {
+ new SingleStoreBulkCopyColumnMapping(0, "user_id"),
+ new SingleStoreBulkCopyColumnMapping(1, "tenant_id"),
+ new SingleStoreBulkCopyColumnMapping(2, "email"),
+ },
+ };
+
+ var result = await WriteToServerAsync(bulkUpdate, dataTable, isAsync);
+
+ Assert.Equal(1, result.RowsStaged);
+ Assert.Equal(1, result.RowsMatched);
+ Assert.Equal(1, result.RowsUpdated);
+
+ using var selectCommand = new SingleStoreCommand("select email from bulk_update_reordered_composite where tenant_id = 1 and user_id = 100;", connection);
+ Assert.Equal("new100@tenant1.com", await selectCommand.ExecuteScalarAsync());
+ }
+
+ [Theory]
+ [InlineData(false)]
+ [InlineData(true)]
public async Task UpdatesNothingWhenNoKeysMatch(bool isAsync)
{
using var connection = new SingleStoreConnection(GetLocalConnectionString(database));
@@ -291,6 +341,13 @@
Assert.Equal(0, result.RowsStaged);
Assert.Equal(0, result.RowsMatched);
Assert.Equal(0, result.RowsUpdated);
+
+ var dataRows = dataTable.Rows.Cast<DataRow>();
+ var rowSequenceResult = isAsync ? await bulkUpdate.WriteToServerAsync(dataRows) : bulkUpdate.WriteToServer(dataRows);
+
+ Assert.Equal(0, rowSequenceResult.RowsStaged);
+ Assert.Equal(0, rowSequenceResult.RowsMatched);
+ Assert.Equal(0, rowSequenceResult.RowsUpdated);
}
[Theory]You can send follow-ups to the cloud agent here.
Comment @cursor review or bugbot run to trigger another review on this PR
Reviewed by Cursor Bugbot for commit 74eaf40. Configure here.
| ICollection<DataRow> dataRows => dataRows.Count, | ||
| IReadOnlyCollection<DataRow> dataRows => dataRows.Count, | ||
| _ => -1, | ||
| }; |
There was a problem hiding this comment.
Empty row sources behave inconsistently
Medium Severity
GetRowCount only treats DataTable and materialized row collections as empty, so an empty DataTable.Rows or other non-counted IEnumerable<DataRow> skips the zero-row short-circuit. Staging then throws ArgumentException instead of returning the same zero-count result as WriteToServer on an empty DataTable.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 74eaf40. Configure here.
| return []; | ||
|
|
||
| if (destinationShardKeyColumns.All(keyColumnSet.Contains)) | ||
| return destinationShardKeyColumns; |
There was a problem hiding this comment.
Staging shard key order invalid
Medium Severity
ComputeStagingShardKey reuses the destination shard key when those column names appear in KeyColumns, but the staging PRIMARY KEY follows KeyColumns order. If that order differs from the destination shard key prefix, CREATE TEMPORARY TABLE can fail even though the join keys are logically valid.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 74eaf40. Configure here.
…he whole op, result warnings possibly being mutated)



No description provided.