Copilot commented on code in PR #398:
URL: https://github.com/apache/sedona-db/pull/398#discussion_r2575623649
##########
rust/sedona-geoparquet/src/writer.rs:
##########
@@ -168,10 +178,78 @@ pub fn create_geoparquet_writer_physical_plan(
);
// Create the sink
- let sink = Arc::new(ParquetSink::new(conf, parquet_options));
+ let sink_input_schema = conf.output_schema;
+ conf.output_schema = parquet_output_schema.clone();
+ let sink = Arc::new(GeoParquetSink {
+ inner: ParquetSink::new(conf, parquet_options),
+ projection: bbox_projection,
+ sink_input_schema,
+ parquet_output_schema,
+ });
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}
+/// Implementation of [DataSink] that computes GeoParquet 1.1 bbox columns
+/// if needed. This is used instead of a ProjectionExec because DataFusion's
+/// optimizer rules seem to rearrange the projection in ways that cause
+/// the plan to fail <https://github.com/apache/sedona-db/issues/379>.
+#[derive(Debug)]
+struct GeoParquetSink {
+ inner: ParquetSink,
+ projection: Option<Vec<(Arc<dyn PhysicalExpr>, String)>>,
+ sink_input_schema: SchemaRef,
+ parquet_output_schema: SchemaRef,
+}
+
+impl DisplayAs for GeoParquetSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ self.inner.fmt_as(t, f)
+ }
+}
+
+#[async_trait]
+impl DataSink for GeoParquetSink {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> &SchemaRef {
+ &self.sink_input_schema
+ }
+
+ async fn write_all(
+ &self,
+ data: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ if let Some(projection) = &self.projection {
+ // If we have a projection, apply it here
+ let schema = self.parquet_output_schema.clone();
+ let projection = projection.clone();
Review Comment:
The projection vector is cloned for each batch in the stream. Consider
cloning it once before the map closure to avoid repeated allocations for each
batch.
##########
rust/sedona-geoparquet/src/writer.rs:
##########
@@ -168,10 +178,78 @@ pub fn create_geoparquet_writer_physical_plan(
);
// Create the sink
- let sink = Arc::new(ParquetSink::new(conf, parquet_options));
+ let sink_input_schema = conf.output_schema;
+ conf.output_schema = parquet_output_schema.clone();
+ let sink = Arc::new(GeoParquetSink {
+ inner: ParquetSink::new(conf, parquet_options),
+ projection: bbox_projection,
+ sink_input_schema,
+ parquet_output_schema,
+ });
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}
+/// Implementation of [DataSink] that computes GeoParquet 1.1 bbox columns
+/// if needed. This is used instead of a ProjectionExec because DataFusion's
+/// optimizer rules seem to rearrange the projection in ways that cause
+/// the plan to fail <https://github.com/apache/sedona-db/issues/379>.
+#[derive(Debug)]
+struct GeoParquetSink {
+ inner: ParquetSink,
+ projection: Option<Vec<(Arc<dyn PhysicalExpr>, String)>>,
+ sink_input_schema: SchemaRef,
+ parquet_output_schema: SchemaRef,
+}
+
+impl DisplayAs for GeoParquetSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ self.inner.fmt_as(t, f)
+ }
+}
+
+#[async_trait]
+impl DataSink for GeoParquetSink {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> &SchemaRef {
+ &self.sink_input_schema
+ }
+
+ async fn write_all(
+ &self,
+ data: SendableRecordBatchStream,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ if let Some(projection) = &self.projection {
+ // If we have a projection, apply it here
+ let schema = self.parquet_output_schema.clone();
+ let projection = projection.clone();
+
+ let data = Box::pin(RecordBatchStreamAdapter::new(
+ schema.clone(),
+ data.map(move |batch_result| {
+ let schema = schema.clone();
+
Review Comment:
The schema is cloned for every batch. Since Arc is already cheap to clone,
this clone inside the closure is redundant - the outer schema clone at line 227
should suffice for the entire stream processing.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]