Copilot commented on code in PR #1497: URL: https://github.com/apache/datafusion-python/pull/1497#discussion_r3105579390
########## python/datafusion/AGENTS.md: ########## @@ -0,0 +1,663 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# DataFusion Python DataFrame API Guide + +## What Is DataFusion? + +DataFusion is an **in-process query engine** built on Apache Arrow. It is not a +database -- there is no server, no connection string, and no external +dependencies. You create a `SessionContext`, point it at data (Parquet, CSV, +JSON, Arrow IPC, Pandas, Polars, or raw Python dicts/lists), and run queries +using either SQL or the DataFrame API described below. + +All data flows through **Apache Arrow**. The canonical Python implementation is +PyArrow (`pyarrow.RecordBatch` / `pyarrow.Table`), but any library that +conforms to the [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) +can interoperate with DataFusion. + +## Core Abstractions + +| Abstraction | Role | Key import | +|---|---|---| +| `SessionContext` | Entry point. Loads data, runs SQL, produces DataFrames. | `from datafusion import SessionContext` | +| `DataFrame` | Lazy query builder. Each method returns a new DataFrame. | Returned by context methods | +| `Expr` | Expression tree node (column ref, literal, function call, ...). | `from datafusion import col, lit` | +| `functions` | 290+ built-in scalar, aggregate, and window functions. | `from datafusion import functions as F` | + +## Import Conventions + +```python +from datafusion import SessionContext, col, lit +from datafusion import functions as F +``` + +## Data Loading + +```python +ctx = SessionContext() + +# From files +df = ctx.read_parquet("path/to/data.parquet") +df = ctx.read_csv("path/to/data.csv") +df = ctx.read_json("path/to/data.json") + +# From Python objects +df = ctx.from_pydict({"a": [1, 2, 3], "b": ["x", "y", "z"]}) +df = ctx.from_pylist([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}]) +df = ctx.from_pandas(pandas_df) +df = ctx.from_polars(polars_df) +df = ctx.from_arrow(arrow_table) + +# From SQL +df = ctx.sql("SELECT a, b FROM my_table WHERE a > 1") +``` + +To make a DataFrame queryable by name in SQL, register it first: + +```python +ctx.register_parquet("my_table", "path/to/data.parquet") +ctx.register_csv("my_table", "path/to/data.csv") +``` + +## DataFrame Operations Quick Reference + +Every method returns a **new** DataFrame (immutable/lazy). Chain them fluently. + +### Projection + +```python +df.select("a", "b") # preferred: plain names as strings +df.select(col("a"), (col("b") + 1).alias("b_plus_1")) # use col()/Expr only when you need an expression + +df.with_column("new_col", col("a") + lit(10)) # add one column +df.with_columns( + col("a").alias("x"), + y=col("b") + lit(1), # named keyword form +) + +df.drop("unwanted_col") +df.with_column_renamed("old_name", "new_name") +``` + +When a column is referenced by name alone, pass the name as a string rather +than wrapping it in `col()`. Reach for `col()` only when the projection needs +arithmetic, aliasing, casting, or another expression operation. + +**Case sensitivity**: both `select("Name")` and `col("Name")` lowercase the +identifier. For a column whose real name has uppercase letters, embed double +quotes inside the string: `select('"MyCol"')` or `col('"MyCol"')`. Without the +inner quotes the lookup will fail with `No field named mycol`. + +### Filtering + +```python +df.filter(col("a") > 10) +df.filter(col("a") > 10, col("b") == "x") # multiple = AND +df.filter("a > 10") # SQL expression string +``` + +Raw Python values on the right-hand side of a comparison are auto-wrapped +into literals by the `Expr` operators, so prefer `col("a") > 10` over +`col("a") > lit(10)`. See the Comparisons section and pitfall #2 for the +full rule. + +### Aggregation + +```python +# GROUP BY a, compute sum(b) and count(*) +df.aggregate(["a"], [F.sum(col("b")), F.count(col("a"))]) + +# HAVING equivalent: use the filter keyword on the aggregate function +df.aggregate( + ["region"], + [F.sum(col("sales"), filter=col("sales") > lit(1000)).alias("large_sales")], +) +``` + +As with `select()`, group keys can be passed as plain name strings. Reach for +`col(...)` only when the grouping expression needs arithmetic, aliasing, +casting, or another expression operation. + +Most aggregate functions accept an optional `filter` keyword argument. When +provided, only rows where the filter expression is true contribute to the +aggregate. + +### Sorting + +```python +df.sort(col("a")) # ascending (default) +df.sort(col("a").sort(ascending=False)) # descending +df.sort(col("a").sort(nulls_first=False)) # override null placement +``` + +A plain expression passed to `sort()` is already treated as ascending. Only +reach for `col(...).sort(...)` when you need to override a default (descending +order or null placement). Writing `col("a").sort(ascending=True)` is redundant. + +### Joining + +```python +# Equi-join on shared column name +df1.join(df2, on="key") +df1.join(df2, on="key", how="left") + +# Different column names +df1.join(df2, left_on="id", right_on="fk_id", how="inner") + +# Expression-based join (supports inequality predicates) +df1.join_on(df2, col("a") == col("b"), how="inner") + +# Semi join: keep rows from left where a match exists in right (like EXISTS) +df1.join(df2, on="key", how="semi") + +# Anti join: keep rows from left where NO match exists in right (like NOT EXISTS) +df1.join(df2, on="key", how="anti") +``` + +Join types: `"inner"`, `"left"`, `"right"`, `"full"`, `"semi"`, `"anti"`. + +Inner is the default `how`. Prefer `df1.join(df2, on="key")` over +`df1.join(df2, on="key", how="inner")` — drop `how=` unless you need a +non-inner join type. + +When the two sides' join columns have different native names, use +`left_on=`/`right_on=` with the original names rather than aliasing one side +to match the other — see pitfall #7. + +### Window Functions + +```python +from datafusion import WindowFrame + +# Row number partitioned by group, ordered by value +df.window( + F.row_number( + partition_by=[col("group")], + order_by=[col("value")], + ).alias("rn") +) + +# Using a Window object for reuse +from datafusion.expr import Window + +win = Window( + partition_by=[col("group")], + order_by=[col("value").sort(ascending=True)], +) +df.select( + col("group"), + col("value"), + F.sum(col("value")).over(win).alias("running_total"), +) + +# With explicit frame bounds +win = Window( + partition_by=[col("group")], + order_by=[col("value").sort(ascending=True)], + window_frame=WindowFrame("rows", 0, None), # current row to unbounded following +) +``` + +### Set Operations + +```python +df1.union(df2) # UNION ALL (by position) +df1.union(df2, distinct=True) # UNION DISTINCT +df1.union_by_name(df2) # match columns by name, not position +df1.intersect(df2) # INTERSECT ALL +df1.except_all(df2) # EXCEPT ALL +``` + +### Limit and Offset + +```python +df.limit(10) # first 10 rows +df.limit(10, offset=20) # skip 20, then take 10 +``` + +### Deduplication + +```python +df.distinct() # remove duplicate rows +df.distinct_on( # keep first row per group (like DISTINCT ON in Postgres) + [col("a")], # uniqueness columns + [col("a"), col("b")], # output columns + [col("b").sort(ascending=True)], # which row to keep +) +``` + +## Executing and Collecting Results + +DataFrames are lazy until you collect. + +```python +df.show() # print formatted table to stdout +batches = df.collect() # list[pa.RecordBatch] +arr = df.collect_column("col_name") # pa.Array | pa.ChunkedArray (single column) +table = df.to_arrow_table() # pa.Table +pandas_df = df.to_pandas() # pd.DataFrame +polars_df = df.to_polars() # pl.DataFrame +py_dict = df.to_pydict() # dict[str, list] +py_list = df.to_pylist() # list[dict] +count = df.count() # int + +# Streaming +stream = df.execute_stream() # RecordBatchStream (single partition) +for batch in stream: + process(batch) +``` + +### Writing Results + +```python +df.write_parquet("output/") +df.write_csv("output/") +df.write_json("output/") Review Comment: These examples use directory-like paths (e.g., `"output/"`), but `DataFrame.write_parquet`/`write_csv`/`write_json` are documented (and used elsewhere in the repo) as writing to a file path (e.g., `"output.parquet"`). Consider updating the examples to file-like paths to better match the Python API and existing examples. ```suggestion df.write_parquet("output.parquet") df.write_csv("output.csv") df.write_json("output.json") ``` ########## python/datafusion/__init__.py: ########## @@ -15,10 +15,45 @@ # specific language governing permissions and limitations # under the License. -"""DataFusion python package. - -This is a Python library that binds to Apache Arrow in-memory query engine DataFusion. -See https://datafusion.apache.org/python for more information. +"""DataFusion: an in-process query engine built on Apache Arrow. + +DataFusion is not a database -- it has no server and no external dependencies. +You create a :py:class:`SessionContext`, point it at data sources (Parquet, CSV, +JSON, Arrow IPC, Pandas, Polars, or raw Python dicts/lists), and run queries +using either SQL or the DataFrame API. + +Core abstractions +----------------- +- **SessionContext** -- entry point for loading data, running SQL, and creating + DataFrames. +- **DataFrame** -- lazy query builder. Every method returns a new DataFrame; + call :py:meth:`~datafusion.dataframe.DataFrame.collect` or a ``to_*`` + method to execute. +- **Expr** -- expression tree node for column references, literals, and function + calls. Build with :py:func:`col` and :py:func:`lit`. +- **functions** -- 290+ built-in scalar, aggregate, and window functions. + +Quick start +----------- +:: + + from datafusion import SessionContext, col, lit + from datafusion import functions as F + + ctx = SessionContext() + df = ctx.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}) + result = ( + df.filter(col("a") > lit(1)) Review Comment: The quick-start snippet uses `col("a") > lit(1)`, but the new `python/datafusion/AGENTS.md` guide recommends omitting `lit()` on the RHS of comparisons because Expr operators auto-wrap Python scalars (i.e., `col("a") > 1`). Consider updating this example to be consistent with the recommended style. ```suggestion from datafusion import SessionContext, col from datafusion import functions as F ctx = SessionContext() df = ctx.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]}) result = ( df.filter(col("a") > 1) ``` ########## python/datafusion/AGENTS.md: ########## @@ -0,0 +1,663 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# DataFusion Python DataFrame API Guide + +## What Is DataFusion? + +DataFusion is an **in-process query engine** built on Apache Arrow. It is not a +database -- there is no server, no connection string, and no external +dependencies. You create a `SessionContext`, point it at data (Parquet, CSV, +JSON, Arrow IPC, Pandas, Polars, or raw Python dicts/lists), and run queries +using either SQL or the DataFrame API described below. + +All data flows through **Apache Arrow**. The canonical Python implementation is +PyArrow (`pyarrow.RecordBatch` / `pyarrow.Table`), but any library that +conforms to the [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) +can interoperate with DataFusion. + +## Core Abstractions + +| Abstraction | Role | Key import | +|---|---|---| +| `SessionContext` | Entry point. Loads data, runs SQL, produces DataFrames. | `from datafusion import SessionContext` | +| `DataFrame` | Lazy query builder. Each method returns a new DataFrame. | Returned by context methods | +| `Expr` | Expression tree node (column ref, literal, function call, ...). | `from datafusion import col, lit` | +| `functions` | 290+ built-in scalar, aggregate, and window functions. | `from datafusion import functions as F` | + +## Import Conventions + +```python +from datafusion import SessionContext, col, lit +from datafusion import functions as F +``` + +## Data Loading + +```python +ctx = SessionContext() + +# From files +df = ctx.read_parquet("path/to/data.parquet") +df = ctx.read_csv("path/to/data.csv") +df = ctx.read_json("path/to/data.json") + +# From Python objects +df = ctx.from_pydict({"a": [1, 2, 3], "b": ["x", "y", "z"]}) +df = ctx.from_pylist([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}]) +df = ctx.from_pandas(pandas_df) +df = ctx.from_polars(polars_df) +df = ctx.from_arrow(arrow_table) + +# From SQL +df = ctx.sql("SELECT a, b FROM my_table WHERE a > 1") +``` + +To make a DataFrame queryable by name in SQL, register it first: + +```python +ctx.register_parquet("my_table", "path/to/data.parquet") +ctx.register_csv("my_table", "path/to/data.csv") +``` + +## DataFrame Operations Quick Reference + +Every method returns a **new** DataFrame (immutable/lazy). Chain them fluently. + +### Projection + +```python +df.select("a", "b") # preferred: plain names as strings +df.select(col("a"), (col("b") + 1).alias("b_plus_1")) # use col()/Expr only when you need an expression + +df.with_column("new_col", col("a") + lit(10)) # add one column +df.with_columns( + col("a").alias("x"), + y=col("b") + lit(1), # named keyword form +) + +df.drop("unwanted_col") +df.with_column_renamed("old_name", "new_name") +``` + +When a column is referenced by name alone, pass the name as a string rather +than wrapping it in `col()`. Reach for `col()` only when the projection needs +arithmetic, aliasing, casting, or another expression operation. + +**Case sensitivity**: both `select("Name")` and `col("Name")` lowercase the +identifier. For a column whose real name has uppercase letters, embed double +quotes inside the string: `select('"MyCol"')` or `col('"MyCol"')`. Without the +inner quotes the lookup will fail with `No field named mycol`. + +### Filtering + +```python +df.filter(col("a") > 10) +df.filter(col("a") > 10, col("b") == "x") # multiple = AND +df.filter("a > 10") # SQL expression string +``` + +Raw Python values on the right-hand side of a comparison are auto-wrapped +into literals by the `Expr` operators, so prefer `col("a") > 10` over +`col("a") > lit(10)`. See the Comparisons section and pitfall #2 for the +full rule. + +### Aggregation + +```python +# GROUP BY a, compute sum(b) and count(*) +df.aggregate(["a"], [F.sum(col("b")), F.count(col("a"))]) + +# HAVING equivalent: use the filter keyword on the aggregate function +df.aggregate( + ["region"], + [F.sum(col("sales"), filter=col("sales") > lit(1000)).alias("large_sales")], +) +``` + +As with `select()`, group keys can be passed as plain name strings. Reach for +`col(...)` only when the grouping expression needs arithmetic, aliasing, +casting, or another expression operation. + +Most aggregate functions accept an optional `filter` keyword argument. When +provided, only rows where the filter expression is true contribute to the +aggregate. + +### Sorting + +```python +df.sort(col("a")) # ascending (default) +df.sort(col("a").sort(ascending=False)) # descending +df.sort(col("a").sort(nulls_first=False)) # override null placement +``` + +A plain expression passed to `sort()` is already treated as ascending. Only +reach for `col(...).sort(...)` when you need to override a default (descending +order or null placement). Writing `col("a").sort(ascending=True)` is redundant. + +### Joining + +```python +# Equi-join on shared column name +df1.join(df2, on="key") +df1.join(df2, on="key", how="left") + +# Different column names +df1.join(df2, left_on="id", right_on="fk_id", how="inner") + +# Expression-based join (supports inequality predicates) +df1.join_on(df2, col("a") == col("b"), how="inner") + +# Semi join: keep rows from left where a match exists in right (like EXISTS) +df1.join(df2, on="key", how="semi") + +# Anti join: keep rows from left where NO match exists in right (like NOT EXISTS) +df1.join(df2, on="key", how="anti") +``` + +Join types: `"inner"`, `"left"`, `"right"`, `"full"`, `"semi"`, `"anti"`. + +Inner is the default `how`. Prefer `df1.join(df2, on="key")` over +`df1.join(df2, on="key", how="inner")` — drop `how=` unless you need a +non-inner join type. + +When the two sides' join columns have different native names, use +`left_on=`/`right_on=` with the original names rather than aliasing one side +to match the other — see pitfall #7. + +### Window Functions + +```python +from datafusion import WindowFrame + +# Row number partitioned by group, ordered by value +df.window( + F.row_number( + partition_by=[col("group")], + order_by=[col("value")], + ).alias("rn") +) + +# Using a Window object for reuse +from datafusion.expr import Window + +win = Window( + partition_by=[col("group")], + order_by=[col("value").sort(ascending=True)], +) +df.select( + col("group"), + col("value"), + F.sum(col("value")).over(win).alias("running_total"), +) + +# With explicit frame bounds +win = Window( + partition_by=[col("group")], + order_by=[col("value").sort(ascending=True)], + window_frame=WindowFrame("rows", 0, None), # current row to unbounded following +) +``` + +### Set Operations + +```python +df1.union(df2) # UNION ALL (by position) +df1.union(df2, distinct=True) # UNION DISTINCT +df1.union_by_name(df2) # match columns by name, not position +df1.intersect(df2) # INTERSECT ALL +df1.except_all(df2) # EXCEPT ALL +``` + +### Limit and Offset + +```python +df.limit(10) # first 10 rows +df.limit(10, offset=20) # skip 20, then take 10 +``` + +### Deduplication + +```python +df.distinct() # remove duplicate rows +df.distinct_on( # keep first row per group (like DISTINCT ON in Postgres) + [col("a")], # uniqueness columns + [col("a"), col("b")], # output columns + [col("b").sort(ascending=True)], # which row to keep +) +``` + +## Executing and Collecting Results + +DataFrames are lazy until you collect. + +```python +df.show() # print formatted table to stdout +batches = df.collect() # list[pa.RecordBatch] +arr = df.collect_column("col_name") # pa.Array | pa.ChunkedArray (single column) +table = df.to_arrow_table() # pa.Table +pandas_df = df.to_pandas() # pd.DataFrame +polars_df = df.to_polars() # pl.DataFrame +py_dict = df.to_pydict() # dict[str, list] +py_list = df.to_pylist() # list[dict] +count = df.count() # int + +# Streaming +stream = df.execute_stream() # RecordBatchStream (single partition) +for batch in stream: + process(batch) +``` + +### Writing Results + +```python +df.write_parquet("output/") +df.write_csv("output/") +df.write_json("output/") +``` + +## Expression Building + +### Column References and Literals + +```python +col("column_name") # reference a column +lit(42) # integer literal +lit("hello") # string literal +lit(3.14) # float literal +lit(pa.scalar(value)) # PyArrow scalar (preserves Arrow type) +``` + +`lit()` accepts PyArrow scalars directly -- prefer this over converting Arrow +data to Python and back when working with values extracted from query results. + +### Arithmetic + +```python +col("price") * col("quantity") # multiplication +col("a") + lit(1) # addition +col("a") - col("b") # subtraction +col("a") / lit(2) # division +col("a") % lit(3) # modulo +``` + +### Date Arithmetic + +`Date32` columns require `Interval` types for arithmetic, not `Duration`. Use +PyArrow's `month_day_nano_interval` type, which takes a `(months, days, nanos)` +tuple: + +```python +import pyarrow as pa + +# Subtract 90 days from a date column +col("ship_date") - lit(pa.scalar((0, 90, 0), type=pa.month_day_nano_interval())) + +# Subtract 3 months +col("ship_date") - lit(pa.scalar((3, 0, 0), type=pa.month_day_nano_interval())) +``` + +**Important**: `lit(datetime.timedelta(days=90))` creates a `Duration(µs)` +literal, which is **not** compatible with `Date32` arithmetic. Always use +`pa.month_day_nano_interval()` for date operations. + +### Comparisons + +```python +col("a") > 10 +col("a") >= 10 +col("a") < 10 +col("a") <= 10 +col("a") == "x" +col("a") != "x" +col("a") == None # same as col("a").is_null() +col("a") != None # same as col("a").is_not_null() +``` + +Comparison operators auto-wrap the right-hand Python value into a literal, +so writing `col("a") > lit(10)` is redundant. Drop the `lit()` in +comparisons. Reach for `lit()` only when auto-wrapping does not apply — see +pitfall #2. + +### Boolean Logic + +**Important**: Python's `and`, `or`, `not` keywords do NOT work with Expr +objects. You must use the bitwise operators: + +```python +(col("a") > 1) & (col("b") < 10) # AND +(col("a") > 1) | (col("b") < 10) # OR +~(col("a") > 1) # NOT +``` + +Always wrap each comparison in parentheses when combining with `&`, `|`, `~` +because Python's operator precedence for bitwise operators is different from +logical operators. + +### Null Handling + +```python +col("a").is_null() +col("a").is_not_null() +col("a").fill_null(lit(0)) # replace NULL with a value +F.coalesce(col("a"), col("b")) # first non-null value +F.nullif(col("a"), lit(0)) # return NULL if a == 0 +``` + +### CASE / WHEN + +```python +# Simple CASE (matching on a single expression) +F.case(col("status")) + .when(lit("A"), lit("Active")) + .when(lit("I"), lit("Inactive")) + .otherwise(lit("Unknown")) + +# Searched CASE (each branch has its own predicate) +F.when(col("value") > lit(100), lit("high")) + .when(col("value") > lit(50), lit("medium")) + .otherwise(lit("low")) +``` + +### Casting + +```python +import pyarrow as pa + +col("a").cast(pa.float64()) +col("a").cast(pa.utf8()) +col("a").cast(pa.date32()) +``` + +### Aliasing + +```python +(col("a") + col("b")).alias("total") +``` + +### BETWEEN and IN + +```python +col("a").between(lit(1), lit(10)) # 1 <= a <= 10 +F.in_list(col("a"), [lit(1), lit(2), lit(3)]) # a IN (1, 2, 3) +F.in_list(col("a"), [lit(1), lit(2)], negated=True) # a NOT IN (1, 2) +``` + +### Struct and Array Access + +```python +col("struct_col")["field_name"] # access struct field +col("array_col")[0] # access array element (0-indexed) +col("array_col")[1:3] # array slice (0-indexed) +``` + +## SQL-to-DataFrame Reference + +| SQL | DataFrame API | +|---|---| +| `SELECT a, b` | `df.select("a", "b")` | +| `SELECT a, b + 1 AS c` | `df.select(col("a"), (col("b") + lit(1)).alias("c"))` | +| `SELECT *, a + 1 AS c` | `df.with_column("c", col("a") + lit(1))` | +| `WHERE a > 10` | `df.filter(col("a") > lit(10))` | +| `GROUP BY a` with `SUM(b)` | `df.aggregate(["a"], [F.sum(col("b"))])` | +| `SUM(b) FILTER (WHERE b > 100)` | `F.sum(col("b"), filter=col("b") > lit(100))` | Review Comment: This SQL-to-DataFrame mapping uses `lit(10)` inside a comparison, but earlier the guide recommends relying on Expr comparison operators auto-wrapping RHS Python scalars (e.g., `col("a") > 10`). Consider updating the table entry to match the recommended style to avoid confusing readers. ```suggestion | `WHERE a > 10` | `df.filter(col("a") > 10)` | | `GROUP BY a` with `SUM(b)` | `df.aggregate(["a"], [F.sum(col("b"))])` | | `SUM(b) FILTER (WHERE b > 100)` | `F.sum(col("b"), filter=col("b") > 100)` | ``` ########## python/datafusion/AGENTS.md: ########## @@ -0,0 +1,663 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# DataFusion Python DataFrame API Guide + +## What Is DataFusion? + +DataFusion is an **in-process query engine** built on Apache Arrow. It is not a +database -- there is no server, no connection string, and no external +dependencies. You create a `SessionContext`, point it at data (Parquet, CSV, +JSON, Arrow IPC, Pandas, Polars, or raw Python dicts/lists), and run queries +using either SQL or the DataFrame API described below. + +All data flows through **Apache Arrow**. The canonical Python implementation is +PyArrow (`pyarrow.RecordBatch` / `pyarrow.Table`), but any library that +conforms to the [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) +can interoperate with DataFusion. + +## Core Abstractions + +| Abstraction | Role | Key import | +|---|---|---| +| `SessionContext` | Entry point. Loads data, runs SQL, produces DataFrames. | `from datafusion import SessionContext` | +| `DataFrame` | Lazy query builder. Each method returns a new DataFrame. | Returned by context methods | +| `Expr` | Expression tree node (column ref, literal, function call, ...). | `from datafusion import col, lit` | +| `functions` | 290+ built-in scalar, aggregate, and window functions. | `from datafusion import functions as F` | + +## Import Conventions + +```python +from datafusion import SessionContext, col, lit +from datafusion import functions as F +``` + +## Data Loading + +```python +ctx = SessionContext() + +# From files +df = ctx.read_parquet("path/to/data.parquet") +df = ctx.read_csv("path/to/data.csv") +df = ctx.read_json("path/to/data.json") + +# From Python objects +df = ctx.from_pydict({"a": [1, 2, 3], "b": ["x", "y", "z"]}) +df = ctx.from_pylist([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}]) +df = ctx.from_pandas(pandas_df) +df = ctx.from_polars(polars_df) +df = ctx.from_arrow(arrow_table) + +# From SQL +df = ctx.sql("SELECT a, b FROM my_table WHERE a > 1") +``` + +To make a DataFrame queryable by name in SQL, register it first: + +```python +ctx.register_parquet("my_table", "path/to/data.parquet") +ctx.register_csv("my_table", "path/to/data.csv") +``` + +## DataFrame Operations Quick Reference + +Every method returns a **new** DataFrame (immutable/lazy). Chain them fluently. + +### Projection + +```python +df.select("a", "b") # preferred: plain names as strings +df.select(col("a"), (col("b") + 1).alias("b_plus_1")) # use col()/Expr only when you need an expression + +df.with_column("new_col", col("a") + lit(10)) # add one column +df.with_columns( + col("a").alias("x"), + y=col("b") + lit(1), # named keyword form +) + +df.drop("unwanted_col") +df.with_column_renamed("old_name", "new_name") +``` + +When a column is referenced by name alone, pass the name as a string rather +than wrapping it in `col()`. Reach for `col()` only when the projection needs +arithmetic, aliasing, casting, or another expression operation. + +**Case sensitivity**: both `select("Name")` and `col("Name")` lowercase the +identifier. For a column whose real name has uppercase letters, embed double +quotes inside the string: `select('"MyCol"')` or `col('"MyCol"')`. Without the +inner quotes the lookup will fail with `No field named mycol`. + +### Filtering + +```python +df.filter(col("a") > 10) +df.filter(col("a") > 10, col("b") == "x") # multiple = AND +df.filter("a > 10") # SQL expression string +``` + +Raw Python values on the right-hand side of a comparison are auto-wrapped +into literals by the `Expr` operators, so prefer `col("a") > 10` over +`col("a") > lit(10)`. See the Comparisons section and pitfall #2 for the +full rule. + +### Aggregation + +```python +# GROUP BY a, compute sum(b) and count(*) +df.aggregate(["a"], [F.sum(col("b")), F.count(col("a"))]) + +# HAVING equivalent: use the filter keyword on the aggregate function +df.aggregate( + ["region"], + [F.sum(col("sales"), filter=col("sales") > lit(1000)).alias("large_sales")], +) +``` + +As with `select()`, group keys can be passed as plain name strings. Reach for +`col(...)` only when the grouping expression needs arithmetic, aliasing, +casting, or another expression operation. + +Most aggregate functions accept an optional `filter` keyword argument. When +provided, only rows where the filter expression is true contribute to the +aggregate. + +### Sorting + +```python +df.sort(col("a")) # ascending (default) +df.sort(col("a").sort(ascending=False)) # descending +df.sort(col("a").sort(nulls_first=False)) # override null placement +``` + +A plain expression passed to `sort()` is already treated as ascending. Only +reach for `col(...).sort(...)` when you need to override a default (descending +order or null placement). Writing `col("a").sort(ascending=True)` is redundant. + +### Joining + +```python +# Equi-join on shared column name +df1.join(df2, on="key") +df1.join(df2, on="key", how="left") + +# Different column names +df1.join(df2, left_on="id", right_on="fk_id", how="inner") + +# Expression-based join (supports inequality predicates) +df1.join_on(df2, col("a") == col("b"), how="inner") + +# Semi join: keep rows from left where a match exists in right (like EXISTS) +df1.join(df2, on="key", how="semi") + +# Anti join: keep rows from left where NO match exists in right (like NOT EXISTS) +df1.join(df2, on="key", how="anti") +``` + +Join types: `"inner"`, `"left"`, `"right"`, `"full"`, `"semi"`, `"anti"`. + +Inner is the default `how`. Prefer `df1.join(df2, on="key")` over +`df1.join(df2, on="key", how="inner")` — drop `how=` unless you need a +non-inner join type. + +When the two sides' join columns have different native names, use +`left_on=`/`right_on=` with the original names rather than aliasing one side +to match the other — see pitfall #7. + +### Window Functions + +```python +from datafusion import WindowFrame + +# Row number partitioned by group, ordered by value +df.window( + F.row_number( + partition_by=[col("group")], + order_by=[col("value")], + ).alias("rn") +) + +# Using a Window object for reuse +from datafusion.expr import Window + +win = Window( + partition_by=[col("group")], + order_by=[col("value").sort(ascending=True)], +) +df.select( + col("group"), + col("value"), + F.sum(col("value")).over(win).alias("running_total"), +) + +# With explicit frame bounds +win = Window( + partition_by=[col("group")], + order_by=[col("value").sort(ascending=True)], + window_frame=WindowFrame("rows", 0, None), # current row to unbounded following +) +``` + +### Set Operations + +```python +df1.union(df2) # UNION ALL (by position) +df1.union(df2, distinct=True) # UNION DISTINCT +df1.union_by_name(df2) # match columns by name, not position +df1.intersect(df2) # INTERSECT ALL +df1.except_all(df2) # EXCEPT ALL +``` + +### Limit and Offset + +```python +df.limit(10) # first 10 rows +df.limit(10, offset=20) # skip 20, then take 10 +``` + +### Deduplication + +```python +df.distinct() # remove duplicate rows +df.distinct_on( # keep first row per group (like DISTINCT ON in Postgres) + [col("a")], # uniqueness columns + [col("a"), col("b")], # output columns + [col("b").sort(ascending=True)], # which row to keep +) +``` + +## Executing and Collecting Results + +DataFrames are lazy until you collect. + +```python +df.show() # print formatted table to stdout +batches = df.collect() # list[pa.RecordBatch] +arr = df.collect_column("col_name") # pa.Array | pa.ChunkedArray (single column) +table = df.to_arrow_table() # pa.Table +pandas_df = df.to_pandas() # pd.DataFrame +polars_df = df.to_polars() # pl.DataFrame +py_dict = df.to_pydict() # dict[str, list] +py_list = df.to_pylist() # list[dict] +count = df.count() # int + +# Streaming +stream = df.execute_stream() # RecordBatchStream (single partition) +for batch in stream: + process(batch) +``` + +### Writing Results + +```python +df.write_parquet("output/") +df.write_csv("output/") +df.write_json("output/") +``` + +## Expression Building + +### Column References and Literals + +```python +col("column_name") # reference a column +lit(42) # integer literal +lit("hello") # string literal +lit(3.14) # float literal +lit(pa.scalar(value)) # PyArrow scalar (preserves Arrow type) +``` + +`lit()` accepts PyArrow scalars directly -- prefer this over converting Arrow +data to Python and back when working with values extracted from query results. + +### Arithmetic + +```python +col("price") * col("quantity") # multiplication +col("a") + lit(1) # addition +col("a") - col("b") # subtraction +col("a") / lit(2) # division +col("a") % lit(3) # modulo +``` + +### Date Arithmetic + +`Date32` columns require `Interval` types for arithmetic, not `Duration`. Use +PyArrow's `month_day_nano_interval` type, which takes a `(months, days, nanos)` +tuple: + +```python +import pyarrow as pa + +# Subtract 90 days from a date column +col("ship_date") - lit(pa.scalar((0, 90, 0), type=pa.month_day_nano_interval())) + +# Subtract 3 months +col("ship_date") - lit(pa.scalar((3, 0, 0), type=pa.month_day_nano_interval())) +``` + +**Important**: `lit(datetime.timedelta(days=90))` creates a `Duration(µs)` +literal, which is **not** compatible with `Date32` arithmetic. Always use +`pa.month_day_nano_interval()` for date operations. + +### Comparisons + +```python +col("a") > 10 +col("a") >= 10 +col("a") < 10 +col("a") <= 10 +col("a") == "x" +col("a") != "x" +col("a") == None # same as col("a").is_null() +col("a") != None # same as col("a").is_not_null() +``` + +Comparison operators auto-wrap the right-hand Python value into a literal, +so writing `col("a") > lit(10)` is redundant. Drop the `lit()` in +comparisons. Reach for `lit()` only when auto-wrapping does not apply — see +pitfall #2. + +### Boolean Logic + +**Important**: Python's `and`, `or`, `not` keywords do NOT work with Expr +objects. You must use the bitwise operators: + +```python +(col("a") > 1) & (col("b") < 10) # AND +(col("a") > 1) | (col("b") < 10) # OR +~(col("a") > 1) # NOT +``` + +Always wrap each comparison in parentheses when combining with `&`, `|`, `~` +because Python's operator precedence for bitwise operators is different from +logical operators. + +### Null Handling + +```python +col("a").is_null() +col("a").is_not_null() +col("a").fill_null(lit(0)) # replace NULL with a value +F.coalesce(col("a"), col("b")) # first non-null value +F.nullif(col("a"), lit(0)) # return NULL if a == 0 +``` + +### CASE / WHEN + +```python +# Simple CASE (matching on a single expression) +F.case(col("status")) + .when(lit("A"), lit("Active")) + .when(lit("I"), lit("Inactive")) + .otherwise(lit("Unknown")) + +# Searched CASE (each branch has its own predicate) +F.when(col("value") > lit(100), lit("high")) + .when(col("value") > lit(50), lit("medium")) + .otherwise(lit("low")) Review Comment: The Simple CASE example is not valid Python as written: the method chain starts on a new line after `F.case(col("status"))` without wrapping the expression in parentheses or using a line-continuation character. Please rewrite the snippet so it can be copy/pasted and executed (e.g., wrap the full chain in parentheses). ```suggestion ( F.case(col("status")) .when(lit("A"), lit("Active")) .when(lit("I"), lit("Inactive")) .otherwise(lit("Unknown")) ) # Searched CASE (each branch has its own predicate) ( F.when(col("value") > lit(100), lit("high")) .when(col("value") > lit(50), lit("medium")) .otherwise(lit("low")) ) ``` ########## python/datafusion/AGENTS.md: ########## @@ -0,0 +1,663 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# DataFusion Python DataFrame API Guide + +## What Is DataFusion? + +DataFusion is an **in-process query engine** built on Apache Arrow. It is not a +database -- there is no server, no connection string, and no external +dependencies. You create a `SessionContext`, point it at data (Parquet, CSV, +JSON, Arrow IPC, Pandas, Polars, or raw Python dicts/lists), and run queries +using either SQL or the DataFrame API described below. + +All data flows through **Apache Arrow**. The canonical Python implementation is +PyArrow (`pyarrow.RecordBatch` / `pyarrow.Table`), but any library that +conforms to the [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) +can interoperate with DataFusion. + +## Core Abstractions + +| Abstraction | Role | Key import | +|---|---|---| +| `SessionContext` | Entry point. Loads data, runs SQL, produces DataFrames. | `from datafusion import SessionContext` | +| `DataFrame` | Lazy query builder. Each method returns a new DataFrame. | Returned by context methods | +| `Expr` | Expression tree node (column ref, literal, function call, ...). | `from datafusion import col, lit` | +| `functions` | 290+ built-in scalar, aggregate, and window functions. | `from datafusion import functions as F` | + +## Import Conventions + +```python +from datafusion import SessionContext, col, lit +from datafusion import functions as F +``` + +## Data Loading + +```python +ctx = SessionContext() + +# From files +df = ctx.read_parquet("path/to/data.parquet") +df = ctx.read_csv("path/to/data.csv") +df = ctx.read_json("path/to/data.json") + +# From Python objects +df = ctx.from_pydict({"a": [1, 2, 3], "b": ["x", "y", "z"]}) +df = ctx.from_pylist([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}]) +df = ctx.from_pandas(pandas_df) +df = ctx.from_polars(polars_df) +df = ctx.from_arrow(arrow_table) + +# From SQL +df = ctx.sql("SELECT a, b FROM my_table WHERE a > 1") +``` + +To make a DataFrame queryable by name in SQL, register it first: + +```python +ctx.register_parquet("my_table", "path/to/data.parquet") +ctx.register_csv("my_table", "path/to/data.csv") +``` + +## DataFrame Operations Quick Reference + +Every method returns a **new** DataFrame (immutable/lazy). Chain them fluently. + +### Projection + +```python +df.select("a", "b") # preferred: plain names as strings +df.select(col("a"), (col("b") + 1).alias("b_plus_1")) # use col()/Expr only when you need an expression + +df.with_column("new_col", col("a") + lit(10)) # add one column +df.with_columns( + col("a").alias("x"), + y=col("b") + lit(1), # named keyword form +) + +df.drop("unwanted_col") +df.with_column_renamed("old_name", "new_name") +``` + +When a column is referenced by name alone, pass the name as a string rather +than wrapping it in `col()`. Reach for `col()` only when the projection needs +arithmetic, aliasing, casting, or another expression operation. + +**Case sensitivity**: both `select("Name")` and `col("Name")` lowercase the +identifier. For a column whose real name has uppercase letters, embed double +quotes inside the string: `select('"MyCol"')` or `col('"MyCol"')`. Without the +inner quotes the lookup will fail with `No field named mycol`. + +### Filtering + +```python +df.filter(col("a") > 10) +df.filter(col("a") > 10, col("b") == "x") # multiple = AND +df.filter("a > 10") # SQL expression string +``` + +Raw Python values on the right-hand side of a comparison are auto-wrapped +into literals by the `Expr` operators, so prefer `col("a") > 10` over +`col("a") > lit(10)`. See the Comparisons section and pitfall #2 for the +full rule. + +### Aggregation + +```python +# GROUP BY a, compute sum(b) and count(*) +df.aggregate(["a"], [F.sum(col("b")), F.count(col("a"))]) + +# HAVING equivalent: use the filter keyword on the aggregate function +df.aggregate( + ["region"], + [F.sum(col("sales"), filter=col("sales") > lit(1000)).alias("large_sales")], +) +``` + +As with `select()`, group keys can be passed as plain name strings. Reach for +`col(...)` only when the grouping expression needs arithmetic, aliasing, +casting, or another expression operation. + +Most aggregate functions accept an optional `filter` keyword argument. When +provided, only rows where the filter expression is true contribute to the +aggregate. + +### Sorting + +```python +df.sort(col("a")) # ascending (default) +df.sort(col("a").sort(ascending=False)) # descending +df.sort(col("a").sort(nulls_first=False)) # override null placement +``` + +A plain expression passed to `sort()` is already treated as ascending. Only +reach for `col(...).sort(...)` when you need to override a default (descending +order or null placement). Writing `col("a").sort(ascending=True)` is redundant. + +### Joining + +```python +# Equi-join on shared column name +df1.join(df2, on="key") +df1.join(df2, on="key", how="left") + +# Different column names +df1.join(df2, left_on="id", right_on="fk_id", how="inner") + +# Expression-based join (supports inequality predicates) +df1.join_on(df2, col("a") == col("b"), how="inner") + +# Semi join: keep rows from left where a match exists in right (like EXISTS) +df1.join(df2, on="key", how="semi") + +# Anti join: keep rows from left where NO match exists in right (like NOT EXISTS) +df1.join(df2, on="key", how="anti") +``` + +Join types: `"inner"`, `"left"`, `"right"`, `"full"`, `"semi"`, `"anti"`. + +Inner is the default `how`. Prefer `df1.join(df2, on="key")` over +`df1.join(df2, on="key", how="inner")` — drop `how=` unless you need a +non-inner join type. + +When the two sides' join columns have different native names, use +`left_on=`/`right_on=` with the original names rather than aliasing one side +to match the other — see pitfall #7. + +### Window Functions + +```python +from datafusion import WindowFrame + +# Row number partitioned by group, ordered by value +df.window( + F.row_number( + partition_by=[col("group")], + order_by=[col("value")], + ).alias("rn") +) + +# Using a Window object for reuse +from datafusion.expr import Window + +win = Window( + partition_by=[col("group")], + order_by=[col("value").sort(ascending=True)], +) +df.select( + col("group"), + col("value"), + F.sum(col("value")).over(win).alias("running_total"), +) + +# With explicit frame bounds +win = Window( + partition_by=[col("group")], + order_by=[col("value").sort(ascending=True)], + window_frame=WindowFrame("rows", 0, None), # current row to unbounded following +) +``` + +### Set Operations + +```python +df1.union(df2) # UNION ALL (by position) +df1.union(df2, distinct=True) # UNION DISTINCT +df1.union_by_name(df2) # match columns by name, not position +df1.intersect(df2) # INTERSECT ALL +df1.except_all(df2) # EXCEPT ALL +``` + +### Limit and Offset + +```python +df.limit(10) # first 10 rows +df.limit(10, offset=20) # skip 20, then take 10 +``` + +### Deduplication + +```python +df.distinct() # remove duplicate rows +df.distinct_on( # keep first row per group (like DISTINCT ON in Postgres) + [col("a")], # uniqueness columns + [col("a"), col("b")], # output columns + [col("b").sort(ascending=True)], # which row to keep +) +``` + +## Executing and Collecting Results + +DataFrames are lazy until you collect. + +```python +df.show() # print formatted table to stdout +batches = df.collect() # list[pa.RecordBatch] +arr = df.collect_column("col_name") # pa.Array | pa.ChunkedArray (single column) +table = df.to_arrow_table() # pa.Table +pandas_df = df.to_pandas() # pd.DataFrame +polars_df = df.to_polars() # pl.DataFrame +py_dict = df.to_pydict() # dict[str, list] +py_list = df.to_pylist() # list[dict] +count = df.count() # int + +# Streaming +stream = df.execute_stream() # RecordBatchStream (single partition) +for batch in stream: + process(batch) +``` + +### Writing Results + +```python +df.write_parquet("output/") +df.write_csv("output/") +df.write_json("output/") +``` + +## Expression Building + +### Column References and Literals + +```python +col("column_name") # reference a column +lit(42) # integer literal +lit("hello") # string literal +lit(3.14) # float literal +lit(pa.scalar(value)) # PyArrow scalar (preserves Arrow type) +``` + +`lit()` accepts PyArrow scalars directly -- prefer this over converting Arrow +data to Python and back when working with values extracted from query results. + +### Arithmetic + +```python +col("price") * col("quantity") # multiplication +col("a") + lit(1) # addition +col("a") - col("b") # subtraction +col("a") / lit(2) # division +col("a") % lit(3) # modulo +``` + +### Date Arithmetic + +`Date32` columns require `Interval` types for arithmetic, not `Duration`. Use +PyArrow's `month_day_nano_interval` type, which takes a `(months, days, nanos)` +tuple: + +```python +import pyarrow as pa + +# Subtract 90 days from a date column +col("ship_date") - lit(pa.scalar((0, 90, 0), type=pa.month_day_nano_interval())) + +# Subtract 3 months +col("ship_date") - lit(pa.scalar((3, 0, 0), type=pa.month_day_nano_interval())) +``` + +**Important**: `lit(datetime.timedelta(days=90))` creates a `Duration(µs)` +literal, which is **not** compatible with `Date32` arithmetic. Always use +`pa.month_day_nano_interval()` for date operations. + +### Comparisons + +```python +col("a") > 10 +col("a") >= 10 +col("a") < 10 +col("a") <= 10 +col("a") == "x" +col("a") != "x" +col("a") == None # same as col("a").is_null() +col("a") != None # same as col("a").is_not_null() +``` + +Comparison operators auto-wrap the right-hand Python value into a literal, +so writing `col("a") > lit(10)` is redundant. Drop the `lit()` in +comparisons. Reach for `lit()` only when auto-wrapping does not apply — see +pitfall #2. + +### Boolean Logic + +**Important**: Python's `and`, `or`, `not` keywords do NOT work with Expr +objects. You must use the bitwise operators: + +```python +(col("a") > 1) & (col("b") < 10) # AND +(col("a") > 1) | (col("b") < 10) # OR +~(col("a") > 1) # NOT +``` + +Always wrap each comparison in parentheses when combining with `&`, `|`, `~` +because Python's operator precedence for bitwise operators is different from +logical operators. + +### Null Handling + +```python +col("a").is_null() +col("a").is_not_null() +col("a").fill_null(lit(0)) # replace NULL with a value +F.coalesce(col("a"), col("b")) # first non-null value +F.nullif(col("a"), lit(0)) # return NULL if a == 0 +``` + +### CASE / WHEN + +```python +# Simple CASE (matching on a single expression) +F.case(col("status")) + .when(lit("A"), lit("Active")) + .when(lit("I"), lit("Inactive")) + .otherwise(lit("Unknown")) + +# Searched CASE (each branch has its own predicate) +F.when(col("value") > lit(100), lit("high")) + .when(col("value") > lit(50), lit("medium")) + .otherwise(lit("low")) +``` + +### Casting + +```python +import pyarrow as pa + +col("a").cast(pa.float64()) +col("a").cast(pa.utf8()) +col("a").cast(pa.date32()) +``` + +### Aliasing + +```python +(col("a") + col("b")).alias("total") +``` + +### BETWEEN and IN + +```python +col("a").between(lit(1), lit(10)) # 1 <= a <= 10 +F.in_list(col("a"), [lit(1), lit(2), lit(3)]) # a IN (1, 2, 3) +F.in_list(col("a"), [lit(1), lit(2)], negated=True) # a NOT IN (1, 2) +``` + +### Struct and Array Access + +```python +col("struct_col")["field_name"] # access struct field +col("array_col")[0] # access array element (0-indexed) +col("array_col")[1:3] # array slice (0-indexed) +``` + +## SQL-to-DataFrame Reference + +| SQL | DataFrame API | +|---|---| +| `SELECT a, b` | `df.select("a", "b")` | +| `SELECT a, b + 1 AS c` | `df.select(col("a"), (col("b") + lit(1)).alias("c"))` | +| `SELECT *, a + 1 AS c` | `df.with_column("c", col("a") + lit(1))` | +| `WHERE a > 10` | `df.filter(col("a") > lit(10))` | +| `GROUP BY a` with `SUM(b)` | `df.aggregate(["a"], [F.sum(col("b"))])` | +| `SUM(b) FILTER (WHERE b > 100)` | `F.sum(col("b"), filter=col("b") > lit(100))` | +| `ORDER BY a DESC` | `df.sort(col("a").sort(ascending=False))` | +| `LIMIT 10 OFFSET 5` | `df.limit(10, offset=5)` | +| `DISTINCT` | `df.distinct()` | +| `a INNER JOIN b ON a.id = b.id` | `a.join(b, on="id")` | +| `a LEFT JOIN b ON a.id = b.fk` | `a.join(b, left_on="id", right_on="fk", how="left")` | +| `WHERE EXISTS (SELECT ...)` | `a.join(b, on="key", how="semi")` | +| `WHERE NOT EXISTS (SELECT ...)` | `a.join(b, on="key", how="anti")` | +| `UNION ALL` | `df1.union(df2)` | +| `UNION` (distinct) | `df1.union(df2, distinct=True)` | +| `INTERSECT` | `df1.intersect(df2)` | +| `EXCEPT` | `df1.except_all(df2)` | Review Comment: The INTERSECT / EXCEPT rows appear to describe the DISTINCT SQL forms, but the current DataFrame APIs default to keeping duplicates (`intersect(distinct=False)` and `except_all(distinct=False)`), which corresponds to `INTERSECT ALL` / `EXCEPT ALL`. Please adjust these mappings (e.g., use `distinct=True` for `INTERSECT`/`EXCEPT`, and document the `ALL` forms explicitly) so the table matches the actual semantics. ```suggestion | `INTERSECT` | `df1.intersect(df2, distinct=True)` | | `INTERSECT ALL` | `df1.intersect(df2)` | | `EXCEPT` | `df1.except_all(df2, distinct=True)` | | `EXCEPT ALL` | `df1.except_all(df2)` | ``` ########## python/datafusion/AGENTS.md: ########## @@ -0,0 +1,663 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# DataFusion Python DataFrame API Guide + +## What Is DataFusion? + +DataFusion is an **in-process query engine** built on Apache Arrow. It is not a +database -- there is no server, no connection string, and no external +dependencies. You create a `SessionContext`, point it at data (Parquet, CSV, +JSON, Arrow IPC, Pandas, Polars, or raw Python dicts/lists), and run queries +using either SQL or the DataFrame API described below. + +All data flows through **Apache Arrow**. The canonical Python implementation is +PyArrow (`pyarrow.RecordBatch` / `pyarrow.Table`), but any library that +conforms to the [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) +can interoperate with DataFusion. + +## Core Abstractions + +| Abstraction | Role | Key import | +|---|---|---| +| `SessionContext` | Entry point. Loads data, runs SQL, produces DataFrames. | `from datafusion import SessionContext` | +| `DataFrame` | Lazy query builder. Each method returns a new DataFrame. | Returned by context methods | +| `Expr` | Expression tree node (column ref, literal, function call, ...). | `from datafusion import col, lit` | +| `functions` | 290+ built-in scalar, aggregate, and window functions. | `from datafusion import functions as F` | + +## Import Conventions + +```python +from datafusion import SessionContext, col, lit +from datafusion import functions as F +``` + +## Data Loading + +```python +ctx = SessionContext() + +# From files +df = ctx.read_parquet("path/to/data.parquet") +df = ctx.read_csv("path/to/data.csv") +df = ctx.read_json("path/to/data.json") + +# From Python objects +df = ctx.from_pydict({"a": [1, 2, 3], "b": ["x", "y", "z"]}) +df = ctx.from_pylist([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}]) +df = ctx.from_pandas(pandas_df) +df = ctx.from_polars(polars_df) +df = ctx.from_arrow(arrow_table) + +# From SQL +df = ctx.sql("SELECT a, b FROM my_table WHERE a > 1") +``` + +To make a DataFrame queryable by name in SQL, register it first: + +```python +ctx.register_parquet("my_table", "path/to/data.parquet") +ctx.register_csv("my_table", "path/to/data.csv") +``` + +## DataFrame Operations Quick Reference + +Every method returns a **new** DataFrame (immutable/lazy). Chain them fluently. + +### Projection + +```python +df.select("a", "b") # preferred: plain names as strings +df.select(col("a"), (col("b") + 1).alias("b_plus_1")) # use col()/Expr only when you need an expression + +df.with_column("new_col", col("a") + lit(10)) # add one column +df.with_columns( + col("a").alias("x"), + y=col("b") + lit(1), # named keyword form +) + +df.drop("unwanted_col") +df.with_column_renamed("old_name", "new_name") +``` + +When a column is referenced by name alone, pass the name as a string rather +than wrapping it in `col()`. Reach for `col()` only when the projection needs +arithmetic, aliasing, casting, or another expression operation. + +**Case sensitivity**: both `select("Name")` and `col("Name")` lowercase the +identifier. For a column whose real name has uppercase letters, embed double +quotes inside the string: `select('"MyCol"')` or `col('"MyCol"')`. Without the +inner quotes the lookup will fail with `No field named mycol`. + +### Filtering + +```python +df.filter(col("a") > 10) +df.filter(col("a") > 10, col("b") == "x") # multiple = AND +df.filter("a > 10") # SQL expression string +``` + +Raw Python values on the right-hand side of a comparison are auto-wrapped +into literals by the `Expr` operators, so prefer `col("a") > 10` over +`col("a") > lit(10)`. See the Comparisons section and pitfall #2 for the +full rule. + +### Aggregation + +```python +# GROUP BY a, compute sum(b) and count(*) +df.aggregate(["a"], [F.sum(col("b")), F.count(col("a"))]) + +# HAVING equivalent: use the filter keyword on the aggregate function +df.aggregate( + ["region"], + [F.sum(col("sales"), filter=col("sales") > lit(1000)).alias("large_sales")], +) +``` + +As with `select()`, group keys can be passed as plain name strings. Reach for +`col(...)` only when the grouping expression needs arithmetic, aliasing, +casting, or another expression operation. + +Most aggregate functions accept an optional `filter` keyword argument. When +provided, only rows where the filter expression is true contribute to the +aggregate. + +### Sorting + +```python +df.sort(col("a")) # ascending (default) +df.sort(col("a").sort(ascending=False)) # descending +df.sort(col("a").sort(nulls_first=False)) # override null placement +``` + +A plain expression passed to `sort()` is already treated as ascending. Only +reach for `col(...).sort(...)` when you need to override a default (descending +order or null placement). Writing `col("a").sort(ascending=True)` is redundant. + +### Joining + +```python +# Equi-join on shared column name +df1.join(df2, on="key") +df1.join(df2, on="key", how="left") + +# Different column names +df1.join(df2, left_on="id", right_on="fk_id", how="inner") + +# Expression-based join (supports inequality predicates) +df1.join_on(df2, col("a") == col("b"), how="inner") + +# Semi join: keep rows from left where a match exists in right (like EXISTS) +df1.join(df2, on="key", how="semi") + +# Anti join: keep rows from left where NO match exists in right (like NOT EXISTS) +df1.join(df2, on="key", how="anti") +``` + +Join types: `"inner"`, `"left"`, `"right"`, `"full"`, `"semi"`, `"anti"`. + +Inner is the default `how`. Prefer `df1.join(df2, on="key")` over +`df1.join(df2, on="key", how="inner")` — drop `how=` unless you need a +non-inner join type. + +When the two sides' join columns have different native names, use +`left_on=`/`right_on=` with the original names rather than aliasing one side +to match the other — see pitfall #7. + +### Window Functions + +```python +from datafusion import WindowFrame + +# Row number partitioned by group, ordered by value +df.window( + F.row_number( + partition_by=[col("group")], + order_by=[col("value")], + ).alias("rn") +) + +# Using a Window object for reuse +from datafusion.expr import Window + +win = Window( + partition_by=[col("group")], + order_by=[col("value").sort(ascending=True)], +) +df.select( + col("group"), + col("value"), + F.sum(col("value")).over(win).alias("running_total"), +) + +# With explicit frame bounds +win = Window( + partition_by=[col("group")], + order_by=[col("value").sort(ascending=True)], + window_frame=WindowFrame("rows", 0, None), # current row to unbounded following +) +``` + +### Set Operations + +```python +df1.union(df2) # UNION ALL (by position) +df1.union(df2, distinct=True) # UNION DISTINCT +df1.union_by_name(df2) # match columns by name, not position +df1.intersect(df2) # INTERSECT ALL +df1.except_all(df2) # EXCEPT ALL +``` + +### Limit and Offset + +```python +df.limit(10) # first 10 rows +df.limit(10, offset=20) # skip 20, then take 10 +``` + +### Deduplication + +```python +df.distinct() # remove duplicate rows +df.distinct_on( # keep first row per group (like DISTINCT ON in Postgres) + [col("a")], # uniqueness columns + [col("a"), col("b")], # output columns + [col("b").sort(ascending=True)], # which row to keep +) +``` + +## Executing and Collecting Results + +DataFrames are lazy until you collect. + +```python +df.show() # print formatted table to stdout +batches = df.collect() # list[pa.RecordBatch] +arr = df.collect_column("col_name") # pa.Array | pa.ChunkedArray (single column) +table = df.to_arrow_table() # pa.Table +pandas_df = df.to_pandas() # pd.DataFrame +polars_df = df.to_polars() # pl.DataFrame +py_dict = df.to_pydict() # dict[str, list] +py_list = df.to_pylist() # list[dict] +count = df.count() # int + +# Streaming +stream = df.execute_stream() # RecordBatchStream (single partition) +for batch in stream: + process(batch) +``` + +### Writing Results + +```python +df.write_parquet("output/") +df.write_csv("output/") +df.write_json("output/") +``` + +## Expression Building + +### Column References and Literals + +```python +col("column_name") # reference a column +lit(42) # integer literal +lit("hello") # string literal +lit(3.14) # float literal +lit(pa.scalar(value)) # PyArrow scalar (preserves Arrow type) +``` + +`lit()` accepts PyArrow scalars directly -- prefer this over converting Arrow +data to Python and back when working with values extracted from query results. + +### Arithmetic + +```python +col("price") * col("quantity") # multiplication +col("a") + lit(1) # addition +col("a") - col("b") # subtraction +col("a") / lit(2) # division +col("a") % lit(3) # modulo +``` + +### Date Arithmetic + +`Date32` columns require `Interval` types for arithmetic, not `Duration`. Use +PyArrow's `month_day_nano_interval` type, which takes a `(months, days, nanos)` +tuple: + +```python +import pyarrow as pa + +# Subtract 90 days from a date column +col("ship_date") - lit(pa.scalar((0, 90, 0), type=pa.month_day_nano_interval())) + +# Subtract 3 months +col("ship_date") - lit(pa.scalar((3, 0, 0), type=pa.month_day_nano_interval())) +``` + +**Important**: `lit(datetime.timedelta(days=90))` creates a `Duration(µs)` +literal, which is **not** compatible with `Date32` arithmetic. Always use +`pa.month_day_nano_interval()` for date operations. + +### Comparisons + +```python +col("a") > 10 +col("a") >= 10 +col("a") < 10 +col("a") <= 10 +col("a") == "x" +col("a") != "x" +col("a") == None # same as col("a").is_null() +col("a") != None # same as col("a").is_not_null() +``` + +Comparison operators auto-wrap the right-hand Python value into a literal, +so writing `col("a") > lit(10)` is redundant. Drop the `lit()` in +comparisons. Reach for `lit()` only when auto-wrapping does not apply — see +pitfall #2. + +### Boolean Logic + +**Important**: Python's `and`, `or`, `not` keywords do NOT work with Expr +objects. You must use the bitwise operators: + +```python +(col("a") > 1) & (col("b") < 10) # AND +(col("a") > 1) | (col("b") < 10) # OR +~(col("a") > 1) # NOT +``` + +Always wrap each comparison in parentheses when combining with `&`, `|`, `~` +because Python's operator precedence for bitwise operators is different from +logical operators. + +### Null Handling + +```python +col("a").is_null() +col("a").is_not_null() +col("a").fill_null(lit(0)) # replace NULL with a value +F.coalesce(col("a"), col("b")) # first non-null value +F.nullif(col("a"), lit(0)) # return NULL if a == 0 +``` + +### CASE / WHEN + +```python +# Simple CASE (matching on a single expression) +F.case(col("status")) + .when(lit("A"), lit("Active")) + .when(lit("I"), lit("Inactive")) + .otherwise(lit("Unknown")) + +# Searched CASE (each branch has its own predicate) +F.when(col("value") > lit(100), lit("high")) + .when(col("value") > lit(50), lit("medium")) + .otherwise(lit("low")) Review Comment: The Searched CASE example is not valid Python as written: the chained `.when(...)` / `.otherwise(...)` calls are on new lines without parentheses or explicit line continuation. Please format the snippet so it is syntactically valid when pasted into a Python REPL/script. ```suggestion ( F.case(col("status")) .when(lit("A"), lit("Active")) .when(lit("I"), lit("Inactive")) .otherwise(lit("Unknown")) ) # Searched CASE (each branch has its own predicate) ( F.when(col("value") > lit(100), lit("high")) .when(col("value") > lit(50), lit("medium")) .otherwise(lit("low")) ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
