This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 2e8fcd4 ARROW-9762: [Rust] [DataFusion] ExecutionContext::sql now
returns DataFrame
2e8fcd4 is described below
commit 2e8fcd418229c8dcd86cd60952d8fef692ddc742
Author: Andy Grove <[email protected]>
AuthorDate: Sat Aug 22 13:25:48 2020 -0600
ARROW-9762: [Rust] [DataFusion] ExecutionContext::sql now returns DataFrame
I need this change so that I can have Ballista use the DataFusion DataFrame
trait and start testing the extension points for the physical planner.
Closes #8027 from andygrove/ARROW-9762
Authored-by: Andy Grove <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
---
rust/datafusion/benches/aggregate_query_sql.rs | 3 ++-
rust/datafusion/examples/csv_sql.rs | 3 ++-
rust/datafusion/examples/parquet_sql.rs | 3 ++-
rust/datafusion/src/bin/repl.rs | 3 ++-
rust/datafusion/src/execution/context.rs | 24 ++++++++----------------
rust/datafusion/src/execution/dataframe_impl.rs | 8 ++++++--
6 files changed, 22 insertions(+), 22 deletions(-)
diff --git a/rust/datafusion/benches/aggregate_query_sql.rs
b/rust/datafusion/benches/aggregate_query_sql.rs
index b42e7fc..d4a82c8 100644
--- a/rust/datafusion/benches/aggregate_query_sql.rs
+++ b/rust/datafusion/benches/aggregate_query_sql.rs
@@ -32,7 +32,8 @@ use datafusion::execution::context::ExecutionContext;
fn aggregate_query(ctx: &mut ExecutionContext, sql: &str) {
// execute the query
- let results = ctx.sql(&sql).unwrap();
+ let df = ctx.sql(&sql).unwrap();
+ let results = df.collect().unwrap();
// display the relation
for _batch in results {}
diff --git a/rust/datafusion/examples/csv_sql.rs
b/rust/datafusion/examples/csv_sql.rs
index 771d99b..97085f8 100644
--- a/rust/datafusion/examples/csv_sql.rs
+++ b/rust/datafusion/examples/csv_sql.rs
@@ -36,12 +36,13 @@ fn main() -> Result<()> {
)?;
// execute the query
- let results = ctx.sql(
+ let df = ctx.sql(
"SELECT c1, MIN(c12), MAX(c12) \
FROM aggregate_test_100 \
WHERE c11 > 0.1 AND c11 < 0.9 \
GROUP BY c1",
)?;
+ let results = df.collect()?;
// print the results
pretty::print_batches(&results)?;
diff --git a/rust/datafusion/examples/parquet_sql.rs
b/rust/datafusion/examples/parquet_sql.rs
index 6359023..cc9ab968 100644
--- a/rust/datafusion/examples/parquet_sql.rs
+++ b/rust/datafusion/examples/parquet_sql.rs
@@ -36,11 +36,12 @@ fn main() -> Result<()> {
)?;
// execute the query
- let results = ctx.sql(
+ let df = ctx.sql(
"SELECT int_col, double_col, CAST(date_string_col as VARCHAR) \
FROM alltypes_plain \
WHERE id > 1 AND tinyint_col < double_col",
)?;
+ let results = df.collect()?;
// print the results
pretty::print_batches(&results)?;
diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs
index 74d4320..d93401b 100644
--- a/rust/datafusion/src/bin/repl.rs
+++ b/rust/datafusion/src/bin/repl.rs
@@ -103,7 +103,8 @@ fn is_exit_command(line: &str) -> bool {
fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> {
let now = Instant::now();
- let results = ctx.sql(&sql)?;
+ let df = ctx.sql(&sql)?;
+ let results = df.collect()?;
if results.is_empty() {
println!(
diff --git a/rust/datafusion/src/execution/context.rs
b/rust/datafusion/src/execution/context.rs
index 6a0b150..ca87c2e 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -133,15 +133,8 @@ impl ExecutionContext {
/// Execute a SQL query and produce a Relation (a schema-aware iterator
over a series
/// of RecordBatch instances)
- pub fn sql(&mut self, sql: &str) -> Result<Vec<RecordBatch>> {
+ pub fn sql(&mut self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let plan = self.create_logical_plan(sql)?;
- return self.collect_plan(&plan);
- }
-
- /// Executes a logical plan and produce a Relation (a schema-aware
iterator over a series
- /// of RecordBatch instances). This function is intended for internal use
and should not be
- /// called directly.
- pub fn collect_plan(&mut self, plan: &LogicalPlan) ->
Result<Vec<RecordBatch>> {
match plan {
LogicalPlan::CreateExternalTable {
ref schema,
@@ -158,11 +151,13 @@ impl ExecutionContext {
.schema(&schema)
.has_header(*has_header),
)?;
- Ok(vec![])
+ let plan = LogicalPlanBuilder::empty().build()?;
+ Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
FileType::Parquet => {
self.register_parquet(name, location)?;
- Ok(vec![])
+ let plan = LogicalPlanBuilder::empty().build()?;
+ Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
_ => Err(ExecutionError::ExecutionError(format!(
"Unsupported file type {:?}.",
@@ -170,11 +165,7 @@ impl ExecutionContext {
))),
},
- plan => {
- let plan = self.optimize(&plan)?;
- let plan = self.create_physical_plan(&plan)?;
- Ok(self.collect(plan.as_ref())?)
- }
+ plan => Ok(Arc::new(DataFrameImpl::new(self.state.clone(),
&plan))),
}
}
@@ -1095,7 +1086,8 @@ mod tests {
let mut ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_physical_planner(Arc::new(MyPhysicalPlanner {})),
);
- ctx.sql("SELECT 1").expect_err("query not supported");
+ let df = ctx.sql("SELECT 1")?;
+ df.collect().expect_err("query not supported");
Ok(())
}
diff --git a/rust/datafusion/src/execution/dataframe_impl.rs
b/rust/datafusion/src/execution/dataframe_impl.rs
index 4698a84..d53fb38 100644
--- a/rust/datafusion/src/execution/dataframe_impl.rs
+++ b/rust/datafusion/src/execution/dataframe_impl.rs
@@ -104,9 +104,13 @@ impl DataFrame for DataFrameImpl {
self.plan.clone()
}
+ // Convert the logical plan represented by this DataFrame into a physical
plan and
+ // execute it
fn collect(&self) -> Result<Vec<RecordBatch>> {
- let mut ctx = ExecutionContext::from(self.ctx_state.clone());
- ctx.collect_plan(&self.plan.clone())
+ let ctx = ExecutionContext::from(self.ctx_state.clone());
+ let plan = ctx.optimize(&self.plan)?;
+ let plan = ctx.create_physical_plan(&plan)?;
+ Ok(ctx.collect(plan.as_ref())?)
}
/// Returns the schema from the logical plan