This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 2f5bc94 MINOR: Remove unused Ballista query execution code path (#732)
2f5bc94 is described below
commit 2f5bc94a9206ab6b0bf9f443c4a756d1f01d566e
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jul 16 07:52:20 2021 -0600
MINOR: Remove unused Ballista query execution code path (#732)
* Remove unused code path
* Remove proto defs
---
ballista/rust/core/proto/ballista.proto | 6 --
ballista/rust/core/src/client.rs | 51 --------------
.../rust/core/src/serde/scheduler/from_proto.rs | 18 -----
ballista/rust/core/src/serde/scheduler/mod.rs | 2 -
ballista/rust/core/src/serde/scheduler/to_proto.rs | 4 --
ballista/rust/executor/src/flight_service.rs | 82 +---------------------
6 files changed, 2 insertions(+), 161 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 1c2328e..0575460 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -699,12 +699,6 @@ message KeyValuePair {
message Action {
oneof ActionType {
- // Execute a logical query plan
- LogicalPlanNode query = 1;
-
- // Execute one partition of a physical query plan
- ExecutePartition execute_partition = 2;
-
// Fetch a partition from an executor
PartitionId fetch_partition = 3;
}
diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index c8267c8..2df4145 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -75,57 +75,6 @@ impl BallistaClient {
Ok(Self { flight_client })
}
- /// Execute one partition of a physical query plan against the executor
- pub async fn execute_partition(
- &mut self,
- job_id: String,
- stage_id: usize,
- partition_id: Vec<usize>,
- plan: Arc<dyn ExecutionPlan>,
- ) -> Result<Vec<ExecutePartitionResult>> {
- let action = Action::ExecutePartition(ExecutePartition {
- job_id,
- stage_id,
- partition_id,
- plan,
- shuffle_locations: Default::default(),
- });
- let stream = self.execute_action(&action).await?;
- let batches = collect(stream).await?;
-
- batches
- .iter()
- .map(|batch| {
- if batch.num_rows() != 1 {
- Err(BallistaError::General(
- "execute_partition received wrong number of
rows".to_owned(),
- ))
- } else {
- let path = batch
- .column(0)
- .as_any()
- .downcast_ref::<StringArray>()
- .expect(
- "execute_partition expected column 0 to be a
StringArray",
- );
-
- let stats = batch
- .column(1)
- .as_any()
- .downcast_ref::<StructArray>()
- .expect(
- "execute_partition expected column 1 to be a
StructArray",
- );
-
- Ok(ExecutePartitionResult::new(
- path.value(0),
- PartitionStats::from_arrow_struct_array(stats),
- ))
- }
- })
- .collect::<Result<Vec<_>>>()
- }
-
/// Fetch a partition from an executor
pub async fn fetch_partition(
&mut self,
diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs
b/ballista/rust/core/src/serde/scheduler/from_proto.rs
index 4631b2e..73f8f53 100644
--- a/ballista/rust/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs
@@ -32,24 +32,6 @@ impl TryInto<Action> for protobuf::Action {
fn try_into(self) -> Result<Action, Self::Error> {
match self.action_type {
- Some(ActionType::ExecutePartition(partition)) => {
- Ok(Action::ExecutePartition(ExecutePartition::new(
- partition.job_id,
- partition.stage_id as usize,
- partition.partition_id.iter().map(|n| *n as
usize).collect(),
- partition
- .plan
- .as_ref()
- .ok_or_else(|| {
- BallistaError::General(
- "PhysicalPlanNode in ExecutePartition is
missing"
- .to_owned(),
- )
- })?
- .try_into()?,
- HashMap::new(),
- )))
- }
Some(ActionType::FetchPartition(partition)) => {
Ok(Action::FetchPartition(partition.try_into()?))
}
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs
b/ballista/rust/core/src/serde/scheduler/mod.rs
index cbe1a31..fa2c1b8 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -35,8 +35,6 @@ pub mod to_proto;
/// Action that can be sent to an executor
#[derive(Debug, Clone)]
pub enum Action {
- /// Execute a query and store the results in memory
- ExecutePartition(ExecutePartition),
/// Collect a shuffle partition
FetchPartition(PartitionId),
}
diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs
b/ballista/rust/core/src/serde/scheduler/to_proto.rs
index 40ca907..c3f2046 100644
--- a/ballista/rust/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs
@@ -29,10 +29,6 @@ impl TryInto<protobuf::Action> for Action {
fn try_into(self) -> Result<protobuf::Action, Self::Error> {
match self {
- Action::ExecutePartition(partition) => Ok(protobuf::Action {
- action_type:
Some(ActionType::ExecutePartition(partition.try_into()?)),
- settings: vec![],
- }),
Action::FetchPartition(partition_id) => Ok(protobuf::Action {
action_type:
Some(ActionType::FetchPartition(partition_id.into())),
settings: vec![],
diff --git a/ballista/rust/executor/src/flight_service.rs
b/ballista/rust/executor/src/flight_service.rs
index 99424b6..7325287 100644
--- a/ballista/rust/executor/src/flight_service.rs
+++ b/ballista/rust/executor/src/flight_service.rs
@@ -25,7 +25,7 @@ use std::sync::Arc;
use crate::executor::Executor;
use ballista_core::error::BallistaError;
use ballista_core::serde::decode_protobuf;
-use ballista_core::serde::scheduler::{Action as BallistaAction,
PartitionStats};
+use ballista_core::serde::scheduler::Action as BallistaAction;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
@@ -33,18 +33,13 @@ use arrow_flight::{
PutResult, SchemaResult, Ticket,
};
use datafusion::arrow::{
- datatypes::{DataType, Field, Schema},
- error::ArrowError,
- ipc::reader::FileReader,
- ipc::writer::IpcWriteOptions,
+ error::ArrowError, ipc::reader::FileReader, ipc::writer::IpcWriteOptions,
record_batch::RecordBatch,
};
-use datafusion::physical_plan::displayable;
use futures::{Stream, StreamExt};
use log::{info, warn};
use std::io::{Read, Seek};
use tokio::sync::mpsc::channel;
-use tokio::task::JoinHandle;
use tokio::{
sync::mpsc::{Receiver, Sender},
task,
@@ -92,79 +87,6 @@ impl FlightService for BallistaFlightService {
decode_protobuf(&ticket.ticket).map_err(|e|
from_ballista_err(&e))?;
match &action {
- BallistaAction::ExecutePartition(partition) => {
- info!(
- "ExecutePartition: job={}, stage={}, partition={:?}\n{}",
- partition.job_id,
- partition.stage_id,
- partition.partition_id,
- displayable(partition.plan.as_ref()).indent().to_string()
- );
-
- let mut tasks: Vec<JoinHandle<Result<_, BallistaError>>> =
vec![];
- for &part in &partition.partition_id {
- let partition = partition.clone();
- let executor = self.executor.clone();
- tasks.push(tokio::spawn(async move {
- let results = executor
- .execute_partition(
- partition.job_id.clone(),
- partition.stage_id,
- part,
- partition.plan.clone(),
- )
- .await?;
- let results = vec![results];
-
- let mut flights: Vec<Result<FlightData, Status>> =
vec![];
- let options =
arrow::ipc::writer::IpcWriteOptions::default();
-
- let mut batches: Vec<Result<FlightData, Status>> =
results
- .iter()
- .flat_map(|batch| create_flight_iter(batch,
&options))
- .collect();
-
- // append batch vector to schema vector, so that the
first message sent is the schema
- flights.append(&mut batches);
-
- Ok(flights)
- }));
- }
-
- // wait for all partitions to complete
- let results = futures::future::join_all(tasks).await;
-
- // get results
- let mut flights: Vec<Result<FlightData, Status>> = vec![];
-
- // add an initial FlightData message that sends schema
- let options = arrow::ipc::writer::IpcWriteOptions::default();
- let stats = PartitionStats::default();
- let schema = Arc::new(Schema::new(vec![
- Field::new("path", DataType::Utf8, false),
- stats.arrow_struct_repr(),
- ]));
- let schema_flight_data =
- arrow_flight::utils::flight_data_from_arrow_schema(
- schema.as_ref(),
- &options,
- );
- flights.push(Ok(schema_flight_data));
-
- // collect statistics from each executed partition
- for result in results {
- let result = result.map_err(|e| {
- Status::internal(format!("Ballista Error: {:?}", e))
- })?;
- let batches = result.map_err(|e| {
- Status::internal(format!("Ballista Error: {:?}", e))
- })?;
- flights.extend_from_slice(&batches);
- }
-
- let output = futures::stream::iter(flights);
- Ok(Response::new(Box::pin(output) as Self::DoGetStream))
- }
BallistaAction::FetchPartition(partition_id) => {
// fetch a partition that was previously executed by this
executor
info!("FetchPartition {:?}", partition_id);