mertak-synnada commented on code in PR #14411:

@@ -0,0 +1,1320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//! This file implements the [`OnDemandRepartitionExec`]  operator, which maps 
N input
+//! partitions to M output partitions based on a partitioning scheme, 
+//! maintaining the order of the input rows in the output.
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{any::Any, vec};
+use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
+use super::{
+    BatchPartitioner, DisplayAs, ExecutionPlanProperties, MaybeBatch, 
+    RepartitionExecBase, RepartitionMetrics, SendableRecordBatchStream,
+use crate::common::SharedMemoryReservation;
+use crate::execution_plan::CardinalityEffect;
+use crate::metrics::BaselineMetrics;
+use crate::repartition::distributor_channels::{
+    DistributionReceiver, DistributionSender,
+use crate::repartition::RepartitionExecStateBuilder;
+use crate::sorts::streaming_merge::StreamingMergeBuilder;
+use crate::stream::RecordBatchStreamAdapter;
+use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, 
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use async_channel::{Receiver, Sender};
+use datafusion_common::{internal_datafusion_err, Result};
+use datafusion_common_runtime::SpawnedTask;
+use datafusion_execution::memory_pool::MemoryConsumer;
+use datafusion_execution::TaskContext;
+use datafusion_common::HashMap;
+use futures::stream::Stream;
+use futures::{ready, FutureExt, StreamExt, TryStreamExt};
+use log::{debug, trace};
+use parking_lot::Mutex;
+type PartitionChannels = (Vec<Sender<usize>>, Vec<Receiver<usize>>);
+#[derive(Debug, Clone)]
+pub struct OnDemandRepartitionExec {

Review Comment:
   I think it might be better to have documentation here on how the 
`OnDemandRepartitionExec` and `OnDemandRepartitionStream` are working and how 
they are different from RoundRobin repartitioning. Especially the channel usage 
maybe with a chart

Review Comment:
   What is the reason to do this initialization?

Review Comment:
   We may create a new Metrics struct for this operator since fetch, 
repartition, send are not sequential. Or we can use the same metrics type by 
adding nanoseconds for each batch, however, this operator seems does not 
calculate repartition or fetch time either. Can you please consider this?

+        match ready!(self.receiver.recv().poll_unpin(cx)) {

Review Comment:
   This part seems can be simplified once debug logs are removed, each seems to 
assign self.is_requested to false and return inner Some or None

Review Comment:
   This part is not valid for `OnDemandRepartitionExec`

+        debug!(

Review Comment:
   We may remove these `debug!` logs I believe

+            // Input is done

Review Comment:
   This comment seems only valid for `None` case isn't it?

