This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 4419338f fix(python/sedonadb): Ensure that Python UDFs executing with 
>1 batch do not cause deadlock (#558)
4419338f is described below

commit 4419338f98067793cf92806aa37312b48476b508
Author: Dewey Dunnington <[email protected]>
AuthorDate: Fri Jan 30 15:04:09 2026 -0600

    fix(python/sedonadb): Ensure that Python UDFs executing with >1 batch do 
not cause deadlock (#558)
---
 python/sedonadb/python/sedonadb/dataframe.py      |   7 +-
 python/sedonadb/src/dataframe.rs                  | 107 +++++++++++++++++-----
 python/sedonadb/src/runtime.rs                    |  20 ++--
 python/sedonadb/tests/functions/test_functions.py |   4 +-
 python/sedonadb/tests/test_dataframe.py           |   2 +-
 python/sedonadb/tests/test_udf.py                 |  20 +++-
 6 files changed, 119 insertions(+), 41 deletions(-)

diff --git a/python/sedonadb/python/sedonadb/dataframe.py 
b/python/sedonadb/python/sedonadb/dataframe.py
index 42980063..38ed5caf 100644
--- a/python/sedonadb/python/sedonadb/dataframe.py
+++ b/python/sedonadb/python/sedonadb/dataframe.py
@@ -253,10 +253,9 @@ class DataFrame:
         import pyarrow as pa
         import geoarrow.pyarrow  # noqa: F401
 
-        if schema is None:
-            return pa.table(self)
-        else:
-            return pa.table(self, schema=pa.schema(schema))
+        # Collects all batches into an object that exposes __arrow_c_stream__()
+        batches = self._impl.to_batches(schema)
+        return pa.table(batches)
 
     def to_pandas(
         self, geometry: Optional[str] = None
diff --git a/python/sedonadb/src/dataframe.rs b/python/sedonadb/src/dataframe.rs
index 3ae85b59..eb57f6db 100644
--- a/python/sedonadb/src/dataframe.rs
+++ b/python/sedonadb/src/dataframe.rs
@@ -18,16 +18,16 @@ use std::ffi::CString;
 use std::str::FromStr;
 use std::sync::Arc;
 
-use arrow_array::ffi::FFI_ArrowSchema;
 use arrow_array::ffi_stream::FFI_ArrowArrayStream;
-use arrow_array::RecordBatchReader;
-use arrow_schema::Schema;
+use arrow_array::{RecordBatch, RecordBatchReader};
+use arrow_schema::{Schema, SchemaRef};
 use datafusion::catalog::MemTable;
 use datafusion::logical_expr::SortExpr;
 use datafusion::prelude::DataFrame;
-use datafusion_common::Column;
+use datafusion_common::{Column, DataFusionError};
 use datafusion_expr::{ExplainFormat, ExplainOption, Expr};
 use datafusion_ffi::table_provider::FFI_TableProvider;
+use futures::TryStreamExt;
 use pyo3::prelude::*;
 use pyo3::types::PyCapsule;
 use sedona::context::{SedonaDataFrame, SedonaWriteOptions};
@@ -38,7 +38,7 @@ use tokio::runtime::Runtime;
 
 use crate::context::InternalContext;
 use crate::error::PySedonaError;
-use crate::import_from::check_pycapsule;
+use crate::import_from::import_arrow_schema;
 use crate::reader::PySedonaStreamReader;
 use crate::runtime::wait_for_future;
 use crate::schema::PySedonaSchema;
@@ -100,14 +100,17 @@ impl InternalDataFrame {
     }
 
     fn execute<'py>(&self, py: Python<'py>) -> Result<usize, PySedonaError> {
-        let mut c = 0;
-        let stream = wait_for_future(py, &self.runtime, 
self.inner.clone().execute_stream())??;
-        let reader = PySedonaStreamReader::new(self.runtime.clone(), stream);
-        for batch in reader {
-            c += batch?.num_rows();
-        }
+        let df = self.inner.clone();
+        let count = wait_for_future(py, &self.runtime, async move {
+            let mut stream = df.execute_stream().await?;
+            let mut c = 0usize;
+            while let Some(batch) = stream.try_next().await? {
+                c += batch.num_rows();
+            }
+            Ok::<_, DataFusionError>(c)
+        })??;
 
-        Ok(c)
+        Ok(count)
     }
 
     fn count<'py>(&self, py: Python<'py>) -> Result<usize, PySedonaError> {
@@ -149,6 +152,28 @@ impl InternalDataFrame {
         ))
     }
 
+    fn to_batches<'py>(
+        &self,
+        py: Python<'py>,
+        requested_schema: Option<Bound<'py, PyAny>>,
+    ) -> Result<Batches, PySedonaError> {
+        check_py_requested_schema(requested_schema, 
self.inner.schema().as_arrow())?;
+
+        let df = self.inner.clone();
+        let batches = wait_for_future(py, &self.runtime, async move {
+            let mut stream = df.execute_stream().await?;
+            let schema = stream.schema();
+            let mut batches = Vec::new();
+            while let Some(batch) = stream.try_next().await? {
+                batches.push(batch);
+            }
+
+            Ok::<_, DataFusionError>(Batches { schema, batches })
+        })??;
+
+        Ok(batches)
+    }
+
     #[allow(clippy::too_many_arguments)]
     fn to_parquet<'py>(
         &self,
@@ -265,20 +290,9 @@ impl InternalDataFrame {
     fn __arrow_c_stream__<'py>(
         &self,
         py: Python<'py>,
-        #[allow(unused_variables)] requested_schema: Option<Bound<'py, 
PyCapsule>>,
+        requested_schema: Option<Bound<'py, PyAny>>,
     ) -> Result<Bound<'py, PyCapsule>, PySedonaError> {
-        if let Some(requested_capsule) = requested_schema {
-            let contents = check_pycapsule(&requested_capsule, 
"arrow_schema")?;
-            let ffi_schema = unsafe { FFI_ArrowSchema::from_raw(contents as _) 
};
-            let requested_schema = Schema::try_from(&ffi_schema)?;
-            let actual_schema = self.inner.schema().as_arrow();
-            if &requested_schema != actual_schema {
-                // Eventually we can support this by inserting a cast
-                return Err(PySedonaError::SedonaPython(
-                    "Requested schema != DataFrame schema not yet 
supported".to_string(),
-                ));
-            }
-        }
+        check_py_requested_schema(requested_schema, 
self.inner.schema().as_arrow())?;
 
         let stream = wait_for_future(py, &self.runtime, 
self.inner.clone().execute_stream())??;
         let reader = PySedonaStreamReader::new(self.runtime.clone(), stream);
@@ -289,3 +303,46 @@ impl InternalDataFrame {
         Ok(PyCapsule::new(py, ffi_stream, Some(stream_capsule_name))?)
     }
 }
+
+#[pyclass]
+pub struct Batches {
+    schema: SchemaRef,
+    batches: Vec<RecordBatch>,
+}
+
+#[pymethods]
+impl Batches {
+    #[pyo3(signature = (requested_schema=None))]
+    fn __arrow_c_stream__<'py>(
+        &self,
+        py: Python<'py>,
+        requested_schema: Option<Bound<'py, PyAny>>,
+    ) -> Result<Bound<'py, PyCapsule>, PySedonaError> {
+        check_py_requested_schema(requested_schema, &self.schema)?;
+
+        let reader = arrow_array::RecordBatchIterator::new(
+            self.batches.clone().into_iter().map(Ok),
+            self.schema.clone(),
+        );
+        let reader: Box<dyn RecordBatchReader + Send> = Box::new(reader);
+
+        let ffi_stream = FFI_ArrowArrayStream::new(reader);
+        let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
+        Ok(PyCapsule::new(py, ffi_stream, Some(stream_capsule_name))?)
+    }
+}
+
+fn check_py_requested_schema<'py>(
+    requested_schema: Option<Bound<'py, PyAny>>,
+    actual_schema: &Schema,
+) -> Result<(), PySedonaError> {
+    if let Some(requested_obj) = requested_schema {
+        let requested = import_arrow_schema(&requested_obj)?;
+        if &requested != actual_schema {
+            return Err(PySedonaError::SedonaPython(
+                "Requested schema != actual schema not yet 
supported".to_string(),
+            ));
+        }
+    }
+    Ok(())
+}
diff --git a/python/sedonadb/src/runtime.rs b/python/sedonadb/src/runtime.rs
index d6db68c7..772f90b1 100644
--- a/python/sedonadb/src/runtime.rs
+++ b/python/sedonadb/src/runtime.rs
@@ -22,13 +22,16 @@ use tokio::{runtime::Runtime, time::sleep};
 use crate::error::PySedonaError;
 
 // Adapted from datafusion-python:
-// 
https://github.com/apache/datafusion-python/blob/cbe845b1e840c78f7a9fc4d83d184a1e6f35f47c/src/utils.rs#L64
+// 
https://github.com/apache/datafusion-python/blob/7aff3635c93d5897d470642928c39c86e7851931/src/utils.rs#L80-L106
 pub fn wait_for_future<F>(py: Python, runtime: &Runtime, fut: F) -> 
Result<F::Output, PySedonaError>
 where
     F: Future + Send,
     F::Output: Send,
 {
-    const INTERVAL_CHECK_SIGNALS: Duration = Duration::from_millis(1_000);
+    const INTERVAL_CHECK_SIGNALS: Duration = Duration::from_millis(2_000);
+
+    py.run(cr"pass", None, None)?;
+    py.check_signals()?;
 
     py.allow_threads(|| {
         runtime.block_on(async {
@@ -37,7 +40,10 @@ where
                 tokio::select! {
                     res = &mut fut => break Ok(res),
                     _ = sleep(INTERVAL_CHECK_SIGNALS) => {
-                        Python::with_gil(|py| py.check_signals())?;
+                        Python::with_gil(|py| {
+                            py.run(cr"pass", None, None)?;
+                            py.check_signals()
+                        })?;
                     }
                 }
             }
@@ -52,15 +58,17 @@ where
     F: Future + Send,
     F::Output: Send,
 {
-    const INTERVAL_CHECK_SIGNALS: Duration = Duration::from_millis(1_000);
-
+    const INTERVAL_CHECK_SIGNALS: Duration = Duration::from_millis(2_000);
     runtime.block_on(async {
         tokio::pin!(fut);
         loop {
             tokio::select! {
                 res = &mut fut => break Ok(res),
                 _ = sleep(INTERVAL_CHECK_SIGNALS) => {
-                    Python::with_gil(|py| py.check_signals())?;
+                    Python::with_gil(|py| {
+                        py.run(cr"pass", None, None)?;
+                        py.check_signals()
+                    })?;
                 }
             }
         }
diff --git a/python/sedonadb/tests/functions/test_functions.py 
b/python/sedonadb/tests/functions/test_functions.py
index 1c16b6d3..34159360 100644
--- a/python/sedonadb/tests/functions/test_functions.py
+++ b/python/sedonadb/tests/functions/test_functions.py
@@ -16,9 +16,9 @@
 # under the License.
 import math
 
-import pyarrow
 import pytest
 import shapely
+import sedonadb
 from sedonadb.testing import PostGIS, SedonaDB, geom_or_null, val_or_null
 
 
@@ -1709,7 +1709,7 @@ def test_st_geomfromwkbunchecked_invalid_wkb(eng):
     )
 
     # Using invalid WKB elsewhere may result in undefined behavior.
-    with pytest.raises(pyarrow.lib.ArrowInvalid, match="failed to fill whole 
buffer"):
+    with pytest.raises(sedonadb._lib.SedonaError, match="failed to fill whole 
buffer"):
         eng.execute_and_collect("SELECT 
ST_AsText(ST_GeomFromWKBUnchecked(0x01))")
 
 
diff --git a/python/sedonadb/tests/test_dataframe.py 
b/python/sedonadb/tests/test_dataframe.py
index bb3ca782..681d6d82 100644
--- a/python/sedonadb/tests/test_dataframe.py
+++ b/python/sedonadb/tests/test_dataframe.py
@@ -261,7 +261,7 @@ def test_dataframe_to_arrow(con):
     # ...but not otherwise (yet)
     with pytest.raises(
         sedonadb._lib.SedonaError,
-        match="Requested schema != DataFrame schema not yet supported",
+        match="Requested schema != actual schema not yet supported",
     ):
         df.to_arrow_table(schema=pa.schema({}))
 
diff --git a/python/sedonadb/tests/test_udf.py 
b/python/sedonadb/tests/test_udf.py
index ab019f9b..4159c96f 100644
--- a/python/sedonadb/tests/test_udf.py
+++ b/python/sedonadb/tests/test_udf.py
@@ -18,6 +18,7 @@
 import pandas as pd
 import pyarrow as pa
 import pytest
+import sedonadb
 from sedonadb import udf
 
 
@@ -122,6 +123,19 @@ def test_shapely_udf(con):
         pd.DataFrame({"col": [3857]}, dtype=np.uint32),
     )
 
+    # Ensure we can collect with >1 batch without hanging
+    con.funcs.table.sd_random_geometry("Point", 20000).to_view("pts", 
overwrite=True)
+    df = con.sql(
+        "SELECT ST_Area(shapely_udf(ST_Point(0, 0), 2.0)) as col FROM pts"
+    ).to_pandas()
+    assert len(df) == 20000
+
+    # Ensure we can execute with >1 batch without hanging
+    count = con.sql(
+        "SELECT ST_Area(shapely_udf(ST_Point(0, 0), 2.0)) as col FROM pts"
+    ).execute()
+    assert count == 20000
+
 
 def test_py_sedona_value(con):
     @udf.arrow_udf(pa.int64())
@@ -170,7 +184,7 @@ def test_udf_bad_return_object(con):
 
     con.register_udf(questionable_udf)
     with pytest.raises(
-        ValueError,
+        sedonadb._lib.SedonaError,
         match="Expected result of user-defined function to return an object 
implementing __arrow_c_array__",
     ):
         con.sql("SELECT questionable_udf(123) as col").to_pandas()
@@ -183,7 +197,7 @@ def test_udf_bad_return_type(con):
 
     con.register_udf(questionable_udf)
     with pytest.raises(
-        ValueError,
+        sedonadb._lib.SedonaError,
         match=(
             "Expected result of user-defined function to "
             "return array of type Binary or its storage "
@@ -200,7 +214,7 @@ def test_udf_bad_return_length(con):
 
     con.register_udf(questionable_udf)
     with pytest.raises(
-        ValueError,
+        sedonadb._lib.SedonaError,
         match="Expected result of user-defined function to return array of 
length 1 but got 2",
     ):
         con.sql("SELECT questionable_udf(123) as col").to_pandas()

Reply via email to