This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 8f4078d ShuffleReaderExec now supports multiple locations per
partition (#541)
8f4078d is described below
commit 8f4078d83f7ea0348fa43906d26156bf8a95de4c
Author: Andy Grove <[email protected]>
AuthorDate: Sat Jun 12 06:45:06 2021 -0600
ShuffleReaderExec now supports multiple locations per partition (#541)
* ShuffleReaderExec now supports multiple locations per partition
* Remove TODO
* avoid clone
---
ballista/rust/client/src/context.rs | 39 ++-------
ballista/rust/core/proto/ballista.proto | 7 +-
.../core/src/execution_plans/shuffle_reader.rs | 94 +++++++++++++---------
.../core/src/serde/physical_plan/from_proto.rs | 12 ++-
.../rust/core/src/serde/physical_plan/to_proto.rs | 18 +++--
ballista/rust/core/src/utils.rs | 40 ++++++++-
ballista/rust/scheduler/src/planner.rs | 2 +-
ballista/rust/scheduler/src/state/mod.rs | 6 +-
8 files changed, 130 insertions(+), 88 deletions(-)
diff --git a/ballista/rust/client/src/context.rs
b/ballista/rust/client/src/context.rs
index 4e5cc1a..695045d 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -29,21 +29,18 @@ use ballista_core::serde::protobuf::{
execute_query_params::Query, job_status, ExecuteQueryParams,
GetJobStatusParams,
GetJobStatusResult,
};
+use ballista_core::utils::WrappedStream;
use ballista_core::{
client::BallistaClient, datasource::DfTableAdapter,
utils::create_datafusion_context,
};
use datafusion::arrow::datatypes::Schema;
-use datafusion::arrow::datatypes::SchemaRef;
-use datafusion::arrow::error::Result as ArrowResult;
-use datafusion::arrow::record_batch::RecordBatch;
use datafusion::catalog::TableReference;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::csv::CsvReadOptions;
use datafusion::{dataframe::DataFrame, physical_plan::RecordBatchStream};
use futures::future;
-use futures::Stream;
use futures::StreamExt;
use log::{error, info};
@@ -74,32 +71,6 @@ impl BallistaContextState {
}
}
-struct WrappedStream {
- stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send +
Sync>>,
- schema: SchemaRef,
-}
-
-impl RecordBatchStream for WrappedStream {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-}
-
-impl Stream for WrappedStream {
- type Item = ArrowResult<RecordBatch>;
-
- fn poll_next(
- mut self: Pin<&mut Self>,
- cx: &mut std::task::Context<'_>,
- ) -> std::task::Poll<Option<Self::Item>> {
- self.stream.poll_next_unpin(cx)
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- self.stream.size_hint()
- }
-}
-
#[allow(dead_code)]
pub struct BallistaContext {
@@ -287,10 +258,10 @@ impl BallistaContext {
.into_iter()
.collect::<Result<Vec<_>>>()?;
- let result = WrappedStream {
- stream:
Box::pin(futures::stream::iter(result).flatten()),
- schema: Arc::new(schema),
- };
+ let result = WrappedStream::new(
+ Box::pin(futures::stream::iter(result).flatten()),
+ Arc::new(schema),
+ );
break Ok(Box::pin(result));
}
};
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 85af902..5aafd00 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -489,10 +489,15 @@ message HashAggregateExecNode {
}
message ShuffleReaderExecNode {
- repeated PartitionLocation partition_location = 1;
+ repeated ShuffleReaderPartition partition = 1;
Schema schema = 2;
}
+message ShuffleReaderPartition {
+ // each partition of a shuffle read can read data from multiple locations
+ repeated PartitionLocation location = 1;
+}
+
message GlobalLimitExecNode {
PhysicalPlanNode input = 1;
uint32 limit = 2;
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index db29cf1..3a7f795 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::fmt::Formatter;
use std::sync::Arc;
use std::{any::Any, pin::Pin};
@@ -22,35 +23,35 @@ use crate::client::BallistaClient;
use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionLocation;
+use crate::utils::WrappedStream;
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
+use datafusion::arrow::error::Result as ArrowResult;
+use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan,
Partitioning};
use datafusion::{
error::{DataFusionError, Result},
physical_plan::RecordBatchStream,
};
+use futures::{future, Stream, StreamExt};
use log::info;
-use std::fmt::Formatter;
-/// ShuffleReaderExec reads partitions that have already been materialized by
an executor.
+/// ShuffleReaderExec reads partitions that have already been materialized by
a query stage
+/// being executed by an executor
#[derive(Debug, Clone)]
pub struct ShuffleReaderExec {
- // The query stage that is responsible for producing the shuffle
partitions that
- // this operator will read
- pub(crate) partition_location: Vec<PartitionLocation>,
+ /// Each partition of a shuffle can read data from multiple locations
+ pub(crate) partition: Vec<Vec<PartitionLocation>>,
pub(crate) schema: SchemaRef,
}
impl ShuffleReaderExec {
/// Create a new ShuffleReaderExec
pub fn try_new(
- partition_meta: Vec<PartitionLocation>,
+ partition: Vec<Vec<PartitionLocation>>,
schema: SchemaRef,
) -> Result<Self> {
- Ok(Self {
- partition_location: partition_meta,
- schema,
- })
+ Ok(Self { partition, schema })
}
}
@@ -65,7 +66,7 @@ impl ExecutionPlan for ShuffleReaderExec {
}
fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(self.partition_location.len())
+ Partitioning::UnknownPartitioning(self.partition.len())
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec {
partition: usize,
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
info!("ShuffleReaderExec::execute({})", partition);
- let partition_location = &self.partition_location[partition];
-
- let mut client = BallistaClient::try_new(
- &partition_location.executor_meta.host,
- partition_location.executor_meta.port,
- )
- .await
- .map_err(|e| DataFusionError::Execution(format!("Ballista Error:
{:?}", e)))?;
- client
- .fetch_partition(
- &partition_location.partition_id.job_id,
- partition_location.partition_id.stage_id,
- partition,
- )
+ let partition_locations = &self.partition[partition];
+ let result =
future::join_all(partition_locations.iter().map(fetch_partition))
.await
- .map_err(|e| DataFusionError::Execution(format!("Ballista Error:
{:?}", e)))
+ .into_iter()
+ .collect::<Result<Vec<_>>>()?;
+
+ let result = WrappedStream::new(
+ Box::pin(futures::stream::iter(result).flatten()),
+ Arc::new(self.schema.as_ref().clone()),
+ );
+ Ok(Box::pin(result))
}
fn fmt_as(
@@ -113,22 +109,46 @@ impl ExecutionPlan for ShuffleReaderExec {
match t {
DisplayFormatType::Default => {
let loc_str = self
- .partition_location
+ .partition
.iter()
- .map(|l| {
- format!(
- "[executor={} part={}:{}:{} stats={:?}]",
- l.executor_meta.id,
- l.partition_id.job_id,
- l.partition_id.stage_id,
- l.partition_id.partition_id,
- l.partition_stats
- )
+ .map(|x| {
+ x.iter()
+ .map(|l| {
+ format!(
+ "[executor={} part={}:{}:{} stats={:?}]",
+ l.executor_meta.id,
+ l.partition_id.job_id,
+ l.partition_id.stage_id,
+ l.partition_id.partition_id,
+ l.partition_stats
+ )
+ })
+ .collect::<Vec<String>>()
+ .join(",")
})
.collect::<Vec<String>>()
- .join(",");
+ .join("\n");
write!(f, "ShuffleReaderExec: partition_locations={}", loc_str)
}
}
}
}
+
+async fn fetch_partition(
+ location: &PartitionLocation,
+) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> {
+ let metadata = &location.executor_meta;
+ let partition_id = &location.partition_id;
+ let mut ballista_client =
+ BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16)
+ .await
+ .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+ Ok(ballista_client
+ .fetch_partition(
+ &partition_id.job_id,
+ partition_id.stage_id as usize,
+ partition_id.partition_id as usize,
+ )
+ .await
+ .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?)
+}
diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
index d49d53c..a2c9db9 100644
--- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs
@@ -25,6 +25,7 @@ use crate::error::BallistaError;
use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::protobuf::LogicalExprNode;
+use crate::serde::protobuf::ShuffleReaderPartition;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{proto_error, protobuf};
use crate::{convert_box_required, convert_required};
@@ -327,10 +328,15 @@ impl TryInto<Arc<dyn ExecutionPlan>> for
&protobuf::PhysicalPlanNode {
}
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
let schema =
Arc::new(convert_required!(shuffle_reader.schema)?);
- let partition_location: Vec<PartitionLocation> = shuffle_reader
- .partition_location
+ let partition_location: Vec<Vec<PartitionLocation>> =
shuffle_reader
+ .partition
.iter()
- .map(|p| p.clone().try_into())
+ .map(|p| {
+ p.location
+ .iter()
+ .map(|l| l.clone().try_into())
+ .collect::<Result<Vec<_>, _>>()
+ })
.collect::<Result<Vec<_>, BallistaError>>()?;
let shuffle_reader =
ShuffleReaderExec::try_new(partition_location, schema)?;
diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
index 26092e7..15d5d4b 100644
--- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs
+++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs
@@ -57,6 +57,7 @@ use protobuf::physical_plan_node::PhysicalPlanType;
use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
+use crate::serde::scheduler::PartitionLocation;
use crate::serde::{protobuf, BallistaError};
use datafusion::physical_plan::functions::{BuiltinScalarFunction,
ScalarFunctionExpr};
use datafusion::physical_plan::merge::MergeExec;
@@ -268,16 +269,19 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn
ExecutionPlan> {
)),
})
} else if let Some(exec) = plan.downcast_ref::<ShuffleReaderExec>() {
- let partition_location = exec
- .partition_location
- .iter()
- .map(|l| l.clone().try_into())
- .collect::<Result<_, _>>()?;
-
+ let mut partition = vec![];
+ for location in &exec.partition {
+ partition.push(protobuf::ShuffleReaderPartition {
+ location: location
+ .iter()
+ .map(|l| l.clone().try_into())
+ .collect::<Result<Vec<_>, _>>()?,
+ });
+ }
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::ShuffleReader(
protobuf::ShuffleReaderExecNode {
- partition_location,
+ partition,
schema: Some(exec.schema().as_ref().into()),
},
)),
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index 4ba6ec4..b58be28 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -27,11 +27,12 @@ use crate::execution_plans::{QueryStageExec,
UnresolvedShuffleExec};
use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionStats;
+use datafusion::arrow::error::Result as ArrowResult;
use datafusion::arrow::{
array::{
ArrayBuilder, ArrayRef, StructArray, StructBuilder, UInt64Array,
UInt64Builder,
},
- datatypes::{DataType, Field},
+ datatypes::{DataType, Field, SchemaRef},
ipc::reader::FileReader,
ipc::writer::FileWriter,
record_batch::RecordBatch,
@@ -54,7 +55,7 @@ use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{
AggregateExpr, ExecutionPlan, PhysicalExpr, RecordBatchStream,
};
-use futures::StreamExt;
+use futures::{future, Stream, StreamExt};
/// Stream data to disk in Arrow IPC format
@@ -234,3 +235,38 @@ pub fn create_datafusion_context() -> ExecutionContext {
.with_physical_optimizer_rules(rules);
ExecutionContext::with_config(config)
}
+
+pub struct WrappedStream {
+ stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send +
Sync>>,
+ schema: SchemaRef,
+}
+
+impl WrappedStream {
+ pub fn new(
+ stream: Pin<Box<dyn Stream<Item = ArrowResult<RecordBatch>> + Send +
Sync>>,
+ schema: SchemaRef,
+ ) -> Self {
+ Self { stream, schema }
+ }
+}
+
+impl RecordBatchStream for WrappedStream {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+}
+
+impl Stream for WrappedStream {
+ type Item = ArrowResult<RecordBatch>;
+
+ fn poll_next(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ self.stream.poll_next_unpin(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.stream.size_hint()
+ }
+}
diff --git a/ballista/rust/scheduler/src/planner.rs
b/ballista/rust/scheduler/src/planner.rs
index 445ef9a..2ac9f61 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -186,7 +186,7 @@ impl DistributedPlanner {
pub fn remove_unresolved_shuffles(
stage: &dyn ExecutionPlan,
- partition_locations: &HashMap<usize, Vec<PartitionLocation>>,
+ partition_locations: &HashMap<usize, Vec<Vec<PartitionLocation>>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
for child in stage.children() {
diff --git a/ballista/rust/scheduler/src/state/mod.rs
b/ballista/rust/scheduler/src/state/mod.rs
index a15efd6..506fd1c 100644
--- a/ballista/rust/scheduler/src/state/mod.rs
+++ b/ballista/rust/scheduler/src/state/mod.rs
@@ -234,7 +234,7 @@ impl SchedulerState {
let unresolved_shuffles = find_unresolved_shuffles(&plan)?;
let mut partition_locations: HashMap<
usize,
- Vec<ballista_core::serde::scheduler::PartitionLocation>,
+
Vec<Vec<ballista_core::serde::scheduler::PartitionLocation>>,
> = HashMap::new();
for unresolved_shuffle in unresolved_shuffles {
for stage_id in unresolved_shuffle.query_stage_ids {
@@ -256,7 +256,7 @@ impl SchedulerState {
let empty = vec![];
let locations =
partition_locations.entry(stage_id).or_insert(empty);
- locations.push(
+ locations.push(vec![
ballista_core::serde::scheduler::PartitionLocation {
partition_id:
ballista_core::serde::scheduler::PartitionId {
@@ -271,7 +271,7 @@ impl SchedulerState {
.clone(),
partition_stats:
PartitionStats::default(),
},
- );
+ ]);
} else {
continue 'tasks;
}