diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 2e8cf6c92..e2e922a82 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -59,6 +59,8 @@ serde_json = { workspace = true } uuid = { workspace = true, features = ["v4"] } mimalloc = { workspace = true, optional = true, features = [ "local_dynamic_tls", + # Pin to mimalloc v2 until apache/datafusion-python#1607 resolves. + "v2", ] } async-trait = { workspace = true } futures = { workspace = true } diff --git a/skills/datafusion_python/SKILL.md b/skills/datafusion_python/SKILL.md index b6c548a83..296b6fdd5 100644 --- a/skills/datafusion_python/SKILL.md +++ b/skills/datafusion_python/SKILL.md @@ -52,6 +52,8 @@ df = ctx.from_pylist([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}]) df = ctx.from_pandas(pandas_df) df = ctx.from_polars(polars_df) df = ctx.from_arrow(arrow_table) +df = ctx.read_batch(record_batch) # one pa.RecordBatch, no named table +df = ctx.read_batches([batch1, batch2]) # several pa.RecordBatch # From SQL df = ctx.sql("SELECT a, b FROM my_table WHERE a > 1") @@ -434,11 +436,19 @@ logical operators. ```python col("a").is_null() col("a").is_not_null() -col("a").fill_null(lit(0)) # replace NULL with a value +col("a").fill_null(lit(0)) # replace NULL with a value (single expression) F.coalesce(col("a"), col("b")) # first non-null value F.nullif(col("a"), lit(0)) # return NULL if a == 0 ``` +To fill nulls across the whole DataFrame (optionally limited to a subset of +columns), use the DataFrame-level method: + +```python +df.fill_null(0) # every column +df.fill_null(0, subset=["a", "b"]) # only these columns +``` + ### CASE / WHEN ```python @@ -466,6 +476,15 @@ import pyarrow as pa col("a").cast(pa.float64()) col("a").cast(pa.utf8()) col("a").cast(pa.date32()) + +col("a").try_cast(pa.int32()) # like cast(), but yields NULL on failure instead of erroring +``` + +To cast several columns at once at the DataFrame level, pass a mapping to +`df.cast(...)`: + +```python +df.cast({"a": pa.float64(), "b": pa.int32()}) ``` ### Aliasing @@ -477,7 +496,7 @@ col("a").cast(pa.date32()) ### BETWEEN and IN ```python -col("a").between(lit(1), lit(10)) # 1 <= a <= 10 +col("a").between(1, 10) # 1 <= a <= 10 (bounds auto-wrap) F.in_list(col("a"), [lit(1), lit(2), lit(3)]) # a IN (1, 2, 3) F.in_list(col("a"), [lit(1), lit(2)], negated=True) # a NOT IN (1, 2) ``` @@ -534,7 +553,7 @@ F.array_transform(col("a"), F.lambda_(["v"], F.lambda_var("v") * lit(2))) | `CASE x WHEN 1 THEN 'a' END` | `F.case(col("x")).when(lit(1), lit("a")).end()` | | `CASE WHEN x > 1 THEN 'a' END` | `F.when(col("x") > 1, lit("a")).end()` | | `x IN (1, 2, 3)` | `F.in_list(col("x"), [lit(1), lit(2), lit(3)])` | -| `x BETWEEN 1 AND 10` | `col("x").between(lit(1), lit(10))` | +| `x BETWEEN 1 AND 10` | `col("x").between(1, 10)` | | `CAST(x AS DOUBLE)` | `col("x").cast(pa.float64())` | | `ROW_NUMBER() OVER (...)` | `F.row_number(partition_by=[...], order_by=[...])` | | `SUM(x) OVER (...)` | `F.sum(col("x")).over(window)` | @@ -556,8 +575,9 @@ F.array_transform(col("a"), F.lambda_(["v"], F.lambda_var("v") * lit(2))) - arithmetic between two literals with no column involved: `lit(1) - col("discount")` is fine, but `lit(1) - lit(2)` needs both - values that must carry a specific Arrow type, via `lit(pa.scalar(...))` - - `.when(...)`, `.otherwise(...)`, `F.nullif(...)`, `.between(...)`, - `F.in_list(...)` and similar method/function arguments + - `.when(...)`, `.otherwise(...)`, `F.nullif(...)`, `F.in_list(...)` + and similar method/function arguments (note: `.between(...)` + auto-wraps its bounds, so `col("a").between(1, 10)` needs no `lit()`) 3. **Column name quoting**: Column names are normalized to lowercase by default in both `select("...")` and `col("...")`. To reference a column with @@ -576,7 +596,8 @@ F.array_transform(col("a"), F.lambda_(["v"], F.lambda_var("v") * lit(2))) partition frame, set `window_frame=WindowFrame("rows", None, None)`. 6. **Arithmetic on aggregates belongs in a later `select`, not inside - `aggregate`**: Each item in the aggregate list must be a single aggregate + `aggregate`** *(applies to datafusion-python 53 and earlier; fixed in 54)*: + Each item in the aggregate list must be a single aggregate call (optionally aliased). Combining aggregates with arithmetic inside `aggregate(...)` fails with `Internal error: Invalid aggregate expression`. Alias the aggregates, then compute the combination downstream: @@ -609,6 +630,12 @@ F.array_transform(col("a"), F.lambda_(["v"], F.lambda_var("v") * lit(2))) # (note: join_on keeps both key columns in the output, unlike on="key") li.join_on(failed, col("l_orderkey") == col("o_orderkey")) ``` + When the same column name exists on both sides, `DataFrame.col(name)` + (and `DataFrame.column(name)`) returns a column reference qualified to + that DataFrame, which disambiguates the predicate explicitly: + ```python + li.join_on(failed, li.col("l_orderkey") == failed.col("o_orderkey")) + ``` ## Idiomatic Patterns @@ -746,7 +773,7 @@ F.left(col("c_phone"), lit(2)) # prefix shortcut **Array/List**: `array`, `make_array`, `array_agg`, `array_length`, `array_element`, `array_slice`, `array_append`, `array_prepend`, -`array_concat`, `array_has`, `array_has_all`, `array_has_any`, `array_position`, +`array_concat`, `array_contains`, `array_has`, `array_has_all`, `array_has_any`, `array_position`, `array_remove`, `array_distinct`, `array_sort`, `array_reverse`, `flatten`, `array_to_string`, `array_intersect`, `array_union`, `array_except`, `generate_series` @@ -808,7 +835,6 @@ both `functions` and `functions.spark` may behave differently: | `concat` | NULL inputs treated as empty | NULL inputs propagate to NULL | | `round` | HALF_EVEN (banker's) | HALF_UP | | `trunc` | Numeric truncation | Date truncation | -| `substring` | 1-indexed | 1-indexed (parity) | Pick the namespace whose semantics match your intent — both stay imported side by side; `enable_spark_functions()` only affects SQL.