Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 89 additions & 1 deletion rust/lance-datafusion/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,82 @@ impl Planner {
})
}

fn is_logical_binary_op(op: &BinaryOperator) -> bool {
matches!(op, BinaryOperator::And | BinaryOperator::Or)
}

fn is_same_logical_binary_op(left: &BinaryOperator, right: &BinaryOperator) -> bool {
matches!(
(left, right),
(BinaryOperator::And, BinaryOperator::And) | (BinaryOperator::Or, BinaryOperator::Or)
)
}

fn flatten_logical_binary_exprs<'a>(
left: &'a SQLExpr,
op: &BinaryOperator,
right: &'a SQLExpr,
) -> Vec<&'a SQLExpr> {
let mut leaves = Vec::new();
let mut stack = vec![right, left];

while let Some(expr) = stack.pop() {
match expr {
SQLExpr::BinaryOp {
left,
op: child_op,
right,
} if Self::is_same_logical_binary_op(op, child_op) => {
stack.push(right.as_ref());
stack.push(left.as_ref());
}
_ => leaves.push(expr),
}
}

leaves
}

fn balanced_binary_expr(mut exprs: VecDeque<Expr>, op: Operator) -> Result<Expr> {
if exprs.is_empty() {
return Err(Error::invalid_input("Binary expression has no operands"));
}

while exprs.len() > 1 {
let mut next = VecDeque::with_capacity(exprs.len().div_ceil(2));
while let Some(left) = exprs.pop_front() {
if let Some(right) = exprs.pop_front() {
next.push_back(Expr::BinaryExpr(BinaryExpr::new(
Box::new(left),
op,
Box::new(right),
)));
} else {
next.push_back(left);
}
}
exprs = next;
}

exprs
.pop_front()
.ok_or_else(|| Error::invalid_input("Binary expression has no operands"))
}

fn binary_expr(&self, left: &SQLExpr, op: &BinaryOperator, right: &SQLExpr) -> Result<Expr> {
let df_op = self.binary_op(op)?;
if Self::is_logical_binary_op(op) {
let leaves = Self::flatten_logical_binary_exprs(left, op, right);
let mut exprs = VecDeque::with_capacity(leaves.len());
for leaf in leaves {
exprs.push_back(self.parse_sql_expr(leaf)?);
}
return Self::balanced_binary_expr(exprs, df_op);
}

Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(self.parse_sql_expr(left)?),
self.binary_op(op)?,
df_op,
Box::new(self.parse_sql_expr(right)?),
)))
}
Expand Down Expand Up @@ -1102,6 +1174,22 @@ mod tests {
);
}

#[test]
fn test_parse_deep_logical_filter() {
let planner = Planner::new(Arc::new(Schema::empty()));

for op in ["AND", "OR"] {
let filter = std::iter::repeat_n("true", 1000)
.collect::<Vec<_>>()
.join(&format!(" {op} "));

let expr = planner.parse_filter(&filter).unwrap();
let optimized = planner.optimize_expr(expr).unwrap();

assert_eq!(optimized, lit(true));
}
}

#[derive(Debug, Eq, PartialEq, Hash)]
struct StrictFloat64Udf {
signature: Signature,
Expand Down
44 changes: 18 additions & 26 deletions rust/lance-datafusion/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,16 @@ pub(crate) fn parse_sql_filter(filter: &str) -> Result<Expr> {
let sql = format!("SELECT 1 FROM t WHERE {filter}");
let statement = parse_statement(&sql)?;

let selection = if let Statement::Query(query) = &statement {
if let SetExpr::Select(s) = query.body.as_ref() {
s.selection.as_ref()
} else {
None
}
if let Statement::Query(query) = statement
&& let SetExpr::Select(select) = *query.body
&& let Some(expr) = select.selection
{
Ok(expr)
} else {
None
};
let expr =
selection.ok_or_else(|| Error::invalid_input(format!("Filter is not valid: {filter}")))?;
Ok(expr.clone())
Err(Error::invalid_input(format!(
"Filter is not valid: {filter}"
)))
}
}

/// Parse a SQL expression to Expression. This is more lenient than parse_sql_filter
Expand All @@ -65,22 +63,16 @@ pub(crate) fn parse_sql_expr(expr: &str) -> Result<Expr> {
let sql = format!("SELECT {expr} FROM t");
let statement = parse_statement(&sql)?;

let selection = if let Statement::Query(query) = &statement {
if let SetExpr::Select(s) = query.body.as_ref() {
if let SelectItem::UnnamedExpr(expr) = &s.projection[0] {
Some(expr)
} else {
None
}
} else {
None
}
if let Statement::Query(query) = statement
&& let SetExpr::Select(select) = *query.body
&& let Some(SelectItem::UnnamedExpr(expr)) = select.projection.into_iter().next()
{
Ok(expr)
} else {
None
};
let expr = selection
.ok_or_else(|| Error::invalid_input(format!("Expression is not valid: {expr}")))?;
Ok(expr.clone())
Err(Error::invalid_input(format!(
"Expression is not valid: {expr}"
)))
}
}

fn parse_statement(statement: &str) -> Result<Statement> {
Expand Down
Loading