feat: Add SQL planner, physical planner, and TableProvider hook for MERGE INTO#22988
feat: Add SQL planner, physical planner, and TableProvider hook for MERGE INTO#22988wirybeaver wants to merge 2 commits into
Conversation
Add merge_into async method to TableProvider trait for MERGE INTO DML support. The method accepts: - source: ExecutionPlan representing the USING clause - on: Expr representing the ON join condition - clauses: Vec<MergeIntoClause> for WHEN MATCHED/NOT MATCHED actions Default implementation returns not_impl_err for tables that don't support MERGE INTO operations.
There was a problem hiding this comment.
@wirybeaver
Thanks for the work on MERGE planning. I found two issues that look like they should be fixed before this lands.
| let target_table_source = self | ||
| .context_provider | ||
| .get_table_source(target_table_ref.clone())?; | ||
| let target_schema = Arc::new(DFSchema::try_from_qualified_schema( |
There was a problem hiding this comment.
I think target aliases need a bit more handling here. Right now MERGE INTO target AS t USING source AS s ON t.id = s.id ... accepts the alias, but target_schema is still qualified with the table name rather than t. That means normal alias-qualified references in the ON clause or WHEN AND expressions will not resolve.
Could we either reject target aliases explicitly for now, or qualify the target side with the alias so this matches the rest of SQL planning?
There was a problem hiding this comment.
Fixed — when an alias is present we now use it as the schema qualifier via TableReference::bare(alias.name.value.clone()), so t.col resolves correctly in ON and WHEN expressions.
| MergeIntoAction::Update(assignments) | ||
| } | ||
| ast::MergeAction::Insert(insert_expr) => { | ||
| let columns: Vec<String> = insert_expr |
There was a problem hiding this comment.
I think the MERGE INSERT actions need the same kind of validation that regular INSERT planning does. At the moment, unknown columns, duplicate columns, and mismatched columns / values lengths can all plan successfully. Empty columns also does not appear to require values for every target column.
That can leave TableProviders receiving malformed actions instead of a validated logical operation. Could we normalize and validate target column names, reject duplicates, and verify that the value count matches either the explicit columns or the full target schema?
There was a problem hiding this comment.
Fixed — we now validate before building the action: duplicate column names are rejected, each named column is checked against the target schema via field_with_unqualified_name, and the value count must equal either the explicit column count (when a column list is given) or the full target schema width (when none is given).
| }) | ||
| .collect::<Result<Vec<_>>>()?; | ||
|
|
||
| // 6. Build the DmlStatement |
There was a problem hiding this comment.
Small cleanup suggestion: this could return Ok(LogicalPlan::Dml(DmlStatement::new(...))) directly instead of binding let plan and then returning Ok(plan).
There was a problem hiding this comment.
Done, returns Ok(LogicalPlan::Dml(...)) directly now.
| .assignments | ||
| .into_iter() | ||
| .map(|assign| { | ||
| let col_name = match &assign.target { |
There was a problem hiding this comment.
Small follow-up suggestion: the update-column extraction and insert-column extraction both walk an ObjectName and take the last identifier. It might be worth adding a small private helper that returns Result<String> and produces a planner error for non-ident parts, rather than using unwrap() at each call site.
There was a problem hiding this comment.
Done — extracted ident_from_object_name_last as a private associated function that returns Result<String> and errors on non-ident parts. Both UPDATE and INSERT column extraction now go through it.
Implement merge_to_plan and merge_clause_to_plan in SQL planner: - Parse Statement::Merge into LogicalPlan::Dml with WriteOp::MergeInto - Resolve target table and plan source (USING clause) as LogicalPlan - Build combined schema for target + source to resolve ON and WHEN expressions - Convert ON condition and WHEN clauses to DataFusion Expr - Handle UPDATE, INSERT, and DELETE actions in WHEN clauses Add physical planner dispatch for WriteOp::MergeInto: - Use source_as_provider() to recover the TableProvider from the TableSource - Extract source ExecutionPlan from children - Call TableProvider::merge_into with source plan, ON condition, and clauses - Wrap errors with MERGE INTO operation context Wire MergeInto's expressions through LogicalPlan tree-traversal so optimizers can rewrite them: add MergeIntoOp::exprs() (stable iteration order: on, then per-clause predicate + action value Exprs) and MergeIntoOp::with_new_exprs() to rebuild the op from a transformed expr vector. Branch LogicalPlan::apply_expressions, map_expressions, and with_new_exprs on WriteOp::MergeInto to use these helpers; other WriteOp variants continue to expose no expressions as before.
cb89e53 to
6f0dfe1
Compare
Which issue does this PR close?
Follow-up to #20763 (merged) which added
MergeIntoOp,MergeIntoClause, and proto types.Rationale for this change
MERGE INTO(SQL:2003) is a widely-used DML statement for upsert/conditional update workloads. This PR wires the types introduced in #20763 through the SQL planner, physical planner, andTableProvidertrait so that table implementations can actually execute merge operations.What changes are included in this PR?
datafusion/catalog—TableProvidertrait extensionmerge_into(source, on, clauses)async method with a defaultnot_impl_errimpl so existing providers are unaffected.datafusion/sql— SQL → LogicalPlanstatement.rs: parseStatement::MergeintoLogicalPlan::DmlwithWriteOp::MergeInto.USINGsource into aLogicalPlan.ONandWHENexpressions.ONcondition andWHEN MATCHED / NOT MATCHEDclauses to DataFusionExpr.datafusion/expr— expression plumbingMergeIntoOp::exprs(): stable iteration over all expressions (ON, then per-clause predicate + action values).MergeIntoOp::with_new_exprs(): rebuild op from a transformed expr vector.LogicalPlan::apply_expressions,map_expressions, andwith_new_exprsonWriteOp::MergeIntoso optimizers can rewrite merge expressions. OtherWriteOpvariants are unchanged.datafusion/core— physical planner dispatchWriteOp::MergeIntoin the physical planner.TableProviderviasource_as_provider(), extract the sourceExecutionPlan, and callTableProvider::merge_into.Are these changes tested?
datafusion/proto/tests/cases/roundtrip_logical_plan.rs(proto round-trip forMergeInto).MergeIntoOp::exprs/with_new_exprsare included indml.rs.TableProviderthat implementsmerge_into; that is left to follow-up once a concrete provider (e.g. Delta Lake) adopts the hook.Are there any user-facing changes?
TableProvidergains a newmerge_intomethod. The default implementation returnsnot_impl_err, so existing implementations compile without changes.MERGE INTO <table> USING <source> ON <cond> WHEN ...SQL syntax is now accepted by the DataFusion SQL parser and planner.