This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f5bc94  MINOR: Remove unused Ballista query execution code path (#732)
2f5bc94 is described below

commit 2f5bc94a9206ab6b0bf9f443c4a756d1f01d566e
Author: Andy Grove <[email protected]>
AuthorDate: Fri Jul 16 07:52:20 2021 -0600

    MINOR: Remove unused Ballista query execution code path (#732)
    
    * Remove unused code path
    
    * Remove proto defs
---
 ballista/rust/core/proto/ballista.proto            |  6 --
 ballista/rust/core/src/client.rs                   | 51 --------------
 .../rust/core/src/serde/scheduler/from_proto.rs    | 18 -----
 ballista/rust/core/src/serde/scheduler/mod.rs      |  2 -
 ballista/rust/core/src/serde/scheduler/to_proto.rs |  4 --
 ballista/rust/executor/src/flight_service.rs       | 82 +---------------------
 6 files changed, 2 insertions(+), 161 deletions(-)

diff --git a/ballista/rust/core/proto/ballista.proto 
b/ballista/rust/core/proto/ballista.proto
index 1c2328e..0575460 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -699,12 +699,6 @@ message KeyValuePair {
 message Action {
 
   oneof ActionType {
-    // Execute a logical query plan
-    LogicalPlanNode query = 1;
-
-    // Execute one partition of a physical query plan
-    ExecutePartition execute_partition = 2;
-
     // Fetch a partition from an executor
     PartitionId fetch_partition = 3;
   }
diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index c8267c8..2df4145 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -75,57 +75,6 @@ impl BallistaClient {
         Ok(Self { flight_client })
     }
 
-    /// Execute one partition of a physical query plan against the executor
-    pub async fn execute_partition(
-        &mut self,
-        job_id: String,
-        stage_id: usize,
-        partition_id: Vec<usize>,
-        plan: Arc<dyn ExecutionPlan>,
-    ) -> Result<Vec<ExecutePartitionResult>> {
-        let action = Action::ExecutePartition(ExecutePartition {
-            job_id,
-            stage_id,
-            partition_id,
-            plan,
-            shuffle_locations: Default::default(),
-        });
-        let stream = self.execute_action(&action).await?;
-        let batches = collect(stream).await?;
-
-        batches
-            .iter()
-            .map(|batch| {
-                if batch.num_rows() != 1 {
-                    Err(BallistaError::General(
-                        "execute_partition received wrong number of 
rows".to_owned(),
-                    ))
-                } else {
-                    let path = batch
-                        .column(0)
-                        .as_any()
-                        .downcast_ref::<StringArray>()
-                        .expect(
-                            "execute_partition expected column 0 to be a 
StringArray",
-                        );
-
-                    let stats = batch
-                        .column(1)
-                        .as_any()
-                        .downcast_ref::<StructArray>()
-                        .expect(
-                            "execute_partition expected column 1 to be a 
StructArray",
-                        );
-
-                    Ok(ExecutePartitionResult::new(
-                        path.value(0),
-                        PartitionStats::from_arrow_struct_array(stats),
-                    ))
-                }
-            })
-            .collect::<Result<Vec<_>>>()
-    }
-
     /// Fetch a partition from an executor
     pub async fn fetch_partition(
         &mut self,
diff --git a/ballista/rust/core/src/serde/scheduler/from_proto.rs 
b/ballista/rust/core/src/serde/scheduler/from_proto.rs
index 4631b2e..73f8f53 100644
--- a/ballista/rust/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/from_proto.rs
@@ -32,24 +32,6 @@ impl TryInto<Action> for protobuf::Action {
 
     fn try_into(self) -> Result<Action, Self::Error> {
         match self.action_type {
-            Some(ActionType::ExecutePartition(partition)) => {
-                Ok(Action::ExecutePartition(ExecutePartition::new(
-                    partition.job_id,
-                    partition.stage_id as usize,
-                    partition.partition_id.iter().map(|n| *n as 
usize).collect(),
-                    partition
-                        .plan
-                        .as_ref()
-                        .ok_or_else(|| {
-                            BallistaError::General(
-                                "PhysicalPlanNode in ExecutePartition is 
missing"
-                                    .to_owned(),
-                            )
-                        })?
-                        .try_into()?,
-                    HashMap::new(),
-                )))
-            }
             Some(ActionType::FetchPartition(partition)) => {
                 Ok(Action::FetchPartition(partition.try_into()?))
             }
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs 
b/ballista/rust/core/src/serde/scheduler/mod.rs
index cbe1a31..fa2c1b8 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -35,8 +35,6 @@ pub mod to_proto;
 /// Action that can be sent to an executor
 #[derive(Debug, Clone)]
 pub enum Action {
-    /// Execute a query and store the results in memory
-    ExecutePartition(ExecutePartition),
     /// Collect a shuffle partition
     FetchPartition(PartitionId),
 }
diff --git a/ballista/rust/core/src/serde/scheduler/to_proto.rs 
b/ballista/rust/core/src/serde/scheduler/to_proto.rs
index 40ca907..c3f2046 100644
--- a/ballista/rust/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/rust/core/src/serde/scheduler/to_proto.rs
@@ -29,10 +29,6 @@ impl TryInto<protobuf::Action> for Action {
 
     fn try_into(self) -> Result<protobuf::Action, Self::Error> {
         match self {
-            Action::ExecutePartition(partition) => Ok(protobuf::Action {
-                action_type: 
Some(ActionType::ExecutePartition(partition.try_into()?)),
-                settings: vec![],
-            }),
             Action::FetchPartition(partition_id) => Ok(protobuf::Action {
                 action_type: 
Some(ActionType::FetchPartition(partition_id.into())),
                 settings: vec![],
diff --git a/ballista/rust/executor/src/flight_service.rs 
b/ballista/rust/executor/src/flight_service.rs
index 99424b6..7325287 100644
--- a/ballista/rust/executor/src/flight_service.rs
+++ b/ballista/rust/executor/src/flight_service.rs
@@ -25,7 +25,7 @@ use std::sync::Arc;
 use crate::executor::Executor;
 use ballista_core::error::BallistaError;
 use ballista_core::serde::decode_protobuf;
-use ballista_core::serde::scheduler::{Action as BallistaAction, 
PartitionStats};
+use ballista_core::serde::scheduler::Action as BallistaAction;
 
 use arrow_flight::{
     flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
@@ -33,18 +33,13 @@ use arrow_flight::{
     PutResult, SchemaResult, Ticket,
 };
 use datafusion::arrow::{
-    datatypes::{DataType, Field, Schema},
-    error::ArrowError,
-    ipc::reader::FileReader,
-    ipc::writer::IpcWriteOptions,
+    error::ArrowError, ipc::reader::FileReader, ipc::writer::IpcWriteOptions,
     record_batch::RecordBatch,
 };
-use datafusion::physical_plan::displayable;
 use futures::{Stream, StreamExt};
 use log::{info, warn};
 use std::io::{Read, Seek};
 use tokio::sync::mpsc::channel;
-use tokio::task::JoinHandle;
 use tokio::{
     sync::mpsc::{Receiver, Sender},
     task,
@@ -92,79 +87,6 @@ impl FlightService for BallistaFlightService {
             decode_protobuf(&ticket.ticket).map_err(|e| 
from_ballista_err(&e))?;
 
         match &action {
-            BallistaAction::ExecutePartition(partition) => {
-                info!(
-                    "ExecutePartition: job={}, stage={}, partition={:?}\n{}",
-                    partition.job_id,
-                    partition.stage_id,
-                    partition.partition_id,
-                    displayable(partition.plan.as_ref()).indent().to_string()
-                );
-
-                let mut tasks: Vec<JoinHandle<Result<_, BallistaError>>> = 
vec![];
-                for &part in &partition.partition_id {
-                    let partition = partition.clone();
-                    let executor = self.executor.clone();
-                    tasks.push(tokio::spawn(async move {
-                        let results = executor
-                            .execute_partition(
-                                partition.job_id.clone(),
-                                partition.stage_id,
-                                part,
-                                partition.plan.clone(),
-                            )
-                            .await?;
-                        let results = vec![results];
-
-                        let mut flights: Vec<Result<FlightData, Status>> = 
vec![];
-                        let options = 
arrow::ipc::writer::IpcWriteOptions::default();
-
-                        let mut batches: Vec<Result<FlightData, Status>> = 
results
-                            .iter()
-                            .flat_map(|batch| create_flight_iter(batch, 
&options))
-                            .collect();
-
-                        // append batch vector to schema vector, so that the 
first message sent is the schema
-                        flights.append(&mut batches);
-
-                        Ok(flights)
-                    }));
-                }
-
-                // wait for all partitions to complete
-                let results = futures::future::join_all(tasks).await;
-
-                // get results
-                let mut flights: Vec<Result<FlightData, Status>> = vec![];
-
-                // add an initial FlightData message that sends schema
-                let options = arrow::ipc::writer::IpcWriteOptions::default();
-                let stats = PartitionStats::default();
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
-                let schema_flight_data =
-                    arrow_flight::utils::flight_data_from_arrow_schema(
-                        schema.as_ref(),
-                        &options,
-                    );
-                flights.push(Ok(schema_flight_data));
-
-                // collect statistics from each executed partition
-                for result in results {
-                    let result = result.map_err(|e| {
-                        Status::internal(format!("Ballista Error: {:?}", e))
-                    })?;
-                    let batches = result.map_err(|e| {
-                        Status::internal(format!("Ballista Error: {:?}", e))
-                    })?;
-                    flights.extend_from_slice(&batches);
-                }
-
-                let output = futures::stream::iter(flights);
-                Ok(Response::new(Box::pin(output) as Self::DoGetStream))
-            }
             BallistaAction::FetchPartition(partition_id) => {
                 // fetch a partition that was previously executed by this 
executor
                 info!("FetchPartition {:?}", partition_id);

Reply via email to