This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new df21442 fix(rust/sedona,python/sedonadb): Ensure empty batches are
not included in RecordBatchReader output (#207)
df21442 is described below
commit df214426366a07e2bc1c46295672c8110b1511c9
Author: Dewey Dunnington <[email protected]>
AuthorDate: Mon Oct 13 03:05:38 2025 -0500
fix(rust/sedona,python/sedonadb): Ensure empty batches are not included in
RecordBatchReader output (#207)
---
python/sedonadb/src/reader.rs | 21 ++++++++++-----
python/sedonadb/tests/test_dataframe.py | 36 +++++++++++++++++++++++--
rust/sedona/src/reader.rs | 47 ++++++++++++++++++++++++++++++---
3 files changed, 93 insertions(+), 11 deletions(-)
diff --git a/python/sedonadb/src/reader.rs b/python/sedonadb/src/reader.rs
index b00333a..4c13e7d 100644
--- a/python/sedonadb/src/reader.rs
+++ b/python/sedonadb/src/reader.rs
@@ -43,12 +43,21 @@ impl Iterator for PySedonaStreamReader {
type Item = std::result::Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
- match wait_for_future_from_rust(&self.runtime, self.stream.try_next())
{
- Ok(maybe_batch) => match maybe_batch {
- Ok(maybe_batch) => maybe_batch.map(Ok),
- Err(err) =>
Some(Err(ArrowError::ExternalError(Box::new(err)))),
- },
- Err(err) => Some(Err(ArrowError::ExternalError(Box::new(err)))),
+ loop {
+ match wait_for_future_from_rust(&self.runtime,
self.stream.try_next()) {
+ Ok(Ok(maybe_batch)) => match maybe_batch {
+ Some(batch) => {
+ if batch.num_rows() == 0 {
+ continue;
+ }
+
+ return Some(Ok(batch));
+ }
+ None => return None,
+ },
+ Ok(Err(df_err)) => return
Some(Err(ArrowError::ExternalError(Box::new(df_err)))),
+ Err(py_err) => return
Some(Err(ArrowError::ExternalError(Box::new(py_err)))),
+ }
}
}
}
diff --git a/python/sedonadb/tests/test_dataframe.py
b/python/sedonadb/tests/test_dataframe.py
index 60ac995..8d488df 100644
--- a/python/sedonadb/tests/test_dataframe.py
+++ b/python/sedonadb/tests/test_dataframe.py
@@ -14,15 +14,18 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
+import tempfile
+from pathlib import Path
+
import geoarrow.pyarrow as ga
import geoarrow.types as gat
import geopandas.testing
import pandas as pd
-from pathlib import Path
import pyarrow as pa
import pytest
import sedonadb
-import tempfile
+from sedonadb.testing import skip_if_not_exists
def test_dataframe_from_dataframe(con):
@@ -255,6 +258,35 @@ def test_dataframe_to_arrow(con):
df.to_arrow_table(schema=pa.schema({}))
+def test_dataframe_to_arrow_empty_batches(con, geoarrow_data):
+ # It's difficult to trigger this with a simpler example
+ # https://github.com/apache/sedona-db/issues/156
+ path_water_junc = (
+ geoarrow_data / "ns-water" / "files" /
"ns-water_water-junc_geo.parquet"
+ )
+ path_water_point = (
+ geoarrow_data / "ns-water" / "files" /
"ns-water_water-point_geo.parquet"
+ )
+ skip_if_not_exists(path_water_junc)
+ skip_if_not_exists(path_water_point)
+
+ con.read_parquet(path_water_junc).to_view("junc", overwrite=True)
+ con.read_parquet(path_water_point).to_view("point", overwrite=True)
+ con.sql("""SELECT geometry FROM junc WHERE "OBJECTID" = 1814""").to_view(
+ "junc_filter", overwrite=True
+ )
+
+ joined = con.sql("""
+ SELECT "OBJECTID", "FEAT_CODE", point.geometry
+ FROM point
+ JOIN junc_filter ON ST_DWithin(junc_filter.geometry, point.geometry,
10000)
+ """)
+
+ reader = pa.RecordBatchReader.from_stream(joined)
+ batch_rows = [len(batch) for batch in reader]
+ assert batch_rows == [24]
+
+
def test_dataframe_to_pandas(con):
# Check with a geometry column
df_with_geo = con.sql("SELECT 1 as one, ST_GeomFromWKT('POINT (0 1)') as
geom")
diff --git a/rust/sedona/src/reader.rs b/rust/sedona/src/reader.rs
index 374f9e0..2b4673f 100644
--- a/rust/sedona/src/reader.rs
+++ b/rust/sedona/src/reader.rs
@@ -41,9 +41,20 @@ impl Iterator for SedonaStreamReader {
type Item = std::result::Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
- match self.runtime.block_on(self.stream.try_next()) {
- Ok(maybe_batch) => maybe_batch.map(Ok),
- Err(err) => Some(Err(ArrowError::ExternalError(Box::new(err)))),
+ loop {
+ match self.runtime.block_on(self.stream.try_next()) {
+ Ok(maybe_batch) => match maybe_batch {
+ Some(batch) => {
+ if batch.num_rows() == 0 {
+ continue;
+ }
+
+ return Some(Ok(batch));
+ }
+ None => return None,
+ },
+ Err(err) => return
Some(Err(ArrowError::ExternalError(Box::new(err)))),
+ }
}
}
}
@@ -57,7 +68,9 @@ impl RecordBatchReader for SedonaStreamReader {
#[cfg(test)]
mod test {
+ use arrow_array::record_batch;
use arrow_schema::{DataType, Field, Schema};
+ use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use crate::context::SedonaContext;
@@ -82,4 +95,32 @@ mod test {
assert_eq!(reader.next().unwrap().unwrap(), expected_batches[0]);
assert!(reader.next().is_none());
}
+
+ #[test]
+ fn reader_empty_chunks() {
+ let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
+
+ let batch0 = record_batch!(
+ ("a", Int32, [1, 2, 3]),
+ ("b", Float64, [Some(4.0), None, Some(5.0)])
+ )
+ .expect("created batch");
+ let schema = batch0.schema();
+
+ let batch1 = RecordBatch::new_empty(schema.clone());
+ let batch2 = batch0.clone();
+
+ let stream = futures::stream::iter(vec![
+ Ok(batch0.clone()),
+ Ok(batch1.clone()),
+ Ok(batch2.clone()),
+ ]);
+ let adapter = RecordBatchStreamAdapter::new(schema, stream);
+ let batch_stream: SendableRecordBatchStream = Box::pin(adapter);
+
+ let mut reader = SedonaStreamReader::new(runtime, batch_stream);
+ assert_eq!(reader.next().unwrap().unwrap(), batch0);
+ assert_eq!(reader.next().unwrap().unwrap(), batch2);
+ assert!(reader.next().is_none());
+ }
}