This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 4cd56743 feat: add support for generating JSON formatted substrait
plan (#1376)
4cd56743 is described below
commit 4cd56743e50d6b33c3d21152ec4ae87d5fe3faf4
Author: Dhanashri Prathamesh Iranna
<[email protected]>
AuthorDate: Sat Feb 14 05:19:28 2026 +0530
feat: add support for generating JSON formatted substrait plan (#1376)
* chore: add new dependencies to pyproject.toml
* chore: rename from_json to parse_json
* fix: missin import
* Fix call to internal function. Drive by update to dquality on logical
plan. Switch unit test to focus on json parsing and not byte serialization.
* fix: resolve clippy redundant closure lint in substrait.rs
---------
Co-authored-by: Tim Saucer <[email protected]>
---
Cargo.lock | 3 +-
Cargo.toml | 1 +
python/datafusion/plan.py | 6 ++++
python/datafusion/substrait.py | 20 ++++++++++++
python/tests/test_substrait.py | 73 ++++++++++++++++++++++++++++++++++++++++++
src/sql/logical.rs | 4 +--
src/substrait.rs | 15 ++++++++-
7 files changed, 118 insertions(+), 4 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 592a797b..6af46fa1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
-version = 3
+version = 4
[[package]]
name = "abi_stable"
@@ -1632,6 +1632,7 @@ dependencies = [
"pyo3-async-runtimes",
"pyo3-build-config",
"pyo3-log",
+ "serde_json",
"tokio",
"url",
"uuid",
diff --git a/Cargo.toml b/Cargo.toml
index 44bb8818..3e632baf 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -62,6 +62,7 @@ datafusion-substrait = { version = "52", optional = true }
datafusion-proto = { version = "52" }
datafusion-ffi = { version = "52" }
prost = "0.14.1" # keep in line with `datafusion-substrait`
+serde_json = "1"
uuid = { version = "1.18", features = ["v4"] }
mimalloc = { version = "0.1", optional = true, default-features = false,
features = [
"local_dynamic_tls",
diff --git a/python/datafusion/plan.py b/python/datafusion/plan.py
index 0b7bebcb..fb54fd62 100644
--- a/python/datafusion/plan.py
+++ b/python/datafusion/plan.py
@@ -98,6 +98,12 @@ class LogicalPlan:
"""
return self._raw_plan.to_proto()
+ def __eq__(self, other: LogicalPlan) -> bool:
+ """Test equality."""
+ if not isinstance(other, LogicalPlan):
+ return False
+ return self._raw_plan.__eq__(other._raw_plan)
+
class ExecutionPlan:
"""Represent nodes in the DataFusion Physical Plan."""
diff --git a/python/datafusion/substrait.py b/python/datafusion/substrait.py
index f10adfb0..3115238f 100644
--- a/python/datafusion/substrait.py
+++ b/python/datafusion/substrait.py
@@ -67,6 +67,26 @@ class Plan:
"""
return self.plan_internal.encode()
+ def to_json(self) -> str:
+ """Get the JSON representation of the Substrait plan.
+
+ Returns:
+ A JSON representation of the Substrait plan.
+ """
+ return self.plan_internal.to_json()
+
+ @staticmethod
+ def from_json(json: str) -> Plan:
+ """Parse a plan from a JSON string representation.
+
+ Args:
+ json: JSON representation of a Substrait plan.
+
+ Returns:
+ Plan object representing the Substrait plan.
+ """
+ return Plan(substrait_internal.Plan.from_json(json))
+
@deprecated("Use `Plan` instead.")
class plan(Plan): # noqa: N801
diff --git a/python/tests/test_substrait.py b/python/tests/test_substrait.py
index 43aa327d..a5f59ba7 100644
--- a/python/tests/test_substrait.py
+++ b/python/tests/test_substrait.py
@@ -74,3 +74,76 @@ def test_substrait_file_serialization(ctx, tmp_path,
path_to_str):
expected_actual_plan = ss.Consumer.from_substrait_plan(ctx, actual_plan)
assert str(expected_logical_plan) == str(expected_actual_plan)
+
+
+def test_json_processing_round_trip(ctx: SessionContext):
+ ctx.register_record_batches("t", [[pa.record_batch({"a": [1]})]])
+ original_logical_plan = ctx.sql("SELECT * FROM t").logical_plan()
+
+ substrait_plan = ss.Producer.to_substrait_plan(original_logical_plan, ctx)
+ json_plan = substrait_plan.to_json()
+
+ expected = """\
+ "relations": [
+ {
+ "root": {
+ "input": {
+ "project": {
+ "common": {
+ "emit": {
+ "outputMapping": [
+ 1
+ ]
+ }
+ },
+ "input": {
+ "read": {
+ "baseSchema": {
+ "names": [
+ "a"
+ ],
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ],
+ "nullability": "NULLABILITY_REQUIRED"
+ }
+ },
+ "namedTable": {
+ "names": [
+ "t"
+ ]
+ }
+ }
+ },
+ "expressions": [
+ {
+ "selection": {
+ "directReference": {
+ "structField": {}
+ },
+ "rootReference": {}
+ }
+ }
+ ]
+ }
+ },
+ "names": [
+ "a"
+ ]
+ }
+ }
+ ]"""
+
+ assert expected in json_plan
+
+ round_trip_substrait_plan = ss.Plan.from_json(json_plan)
+ round_trip_logical_plan = ss.Consumer.from_substrait_plan(
+ ctx, round_trip_substrait_plan
+ )
+
+ assert round_trip_logical_plan == original_logical_plan
diff --git a/src/sql/logical.rs b/src/sql/logical.rs
index 78611819..cd2ed73d 100644
--- a/src/sql/logical.rs
+++ b/src/sql/logical.rs
@@ -66,8 +66,8 @@ use crate::expr::unnest::PyUnnest;
use crate::expr::values::PyValues;
use crate::expr::window::PyWindowExpr;
-#[pyclass(frozen, name = "LogicalPlan", module = "datafusion", subclass)]
-#[derive(Debug, Clone)]
+#[pyclass(frozen, name = "LogicalPlan", module = "datafusion", subclass, eq)]
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PyLogicalPlan {
pub(crate) plan: Arc<LogicalPlan>,
}
diff --git a/src/substrait.rs b/src/substrait.rs
index ea8eaf50..1cbf3256 100644
--- a/src/substrait.rs
+++ b/src/substrait.rs
@@ -23,7 +23,7 @@ use pyo3::prelude::*;
use pyo3::types::PyBytes;
use crate::context::PySessionContext;
-use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err};
+use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err,
to_datafusion_err};
use crate::sql::logical::PyLogicalPlan;
use crate::utils::wait_for_future;
@@ -42,6 +42,19 @@ impl PyPlan {
.map_err(PyDataFusionError::EncodeError)?;
Ok(PyBytes::new(py, &proto_bytes).into())
}
+
+ /// Get the JSON representation of the substrait plan
+ fn to_json(&self) -> PyDataFusionResult<String> {
+ let json =
serde_json::to_string_pretty(&self.plan).map_err(to_datafusion_err)?;
+ Ok(json)
+ }
+
+ /// Parse a Substrait Plan from its JSON representation
+ #[staticmethod]
+ fn from_json(json: &str) -> PyDataFusionResult<PyPlan> {
+ let plan: Plan =
serde_json::from_str(json).map_err(to_datafusion_err)?;
+ Ok(PyPlan { plan })
+ }
}
impl From<PyPlan> for Plan {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]