alamb commented on code in PR #14057: URL: https://github.com/apache/datafusion/pull/14057#discussion_r1912435293
########## datafusion/common/src/dfschema.rs: ########## @@ -106,37 +106,175 @@ pub type DFSchemaRef = Arc<DFSchema>; /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { + inner: QualifiedSchema, + /// Stores functional dependencies in the schema. + functional_dependencies: FunctionalDependencies, + /// metadata columns + metadata: Option<QualifiedSchema>, +} + +pub const METADATA_OFFSET: usize = usize::MAX >> 1; Review Comment: Can you please document what this is and how it relates to `DFSchema::inner` ########## datafusion/core/tests/sql/metadata_columns.rs: ########## @@ -0,0 +1,361 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{self, Debug, Formatter}; +use std::sync::{Arc, Mutex}; + +use arrow::compute::concat_batches; +use arrow_array::{ArrayRef, UInt64Array}; +use arrow_schema::SchemaBuilder; +use async_trait::async_trait; +use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::file_format::csv::CsvSerializer; +use datafusion::datasource::file_format::write::BatchSerializer; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result; +use datafusion::execution::context::TaskContext; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::memory::MemoryStream; +use datafusion::physical_plan::{ + project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, +}; +use datafusion::prelude::*; + +use datafusion::catalog::Session; +use datafusion_common::METADATA_OFFSET; +use itertools::Itertools; + +/// A User, with an id and a bank account +#[derive(Clone, Debug)] +struct User { + id: u8, + bank_account: u64, +} + +/// A custom datasource, used to represent a datastore with a single index +#[derive(Clone)] +pub struct CustomDataSource { + inner: Arc<Mutex<CustomDataSourceInner>>, + metadata_columns: SchemaRef, +} + +struct CustomDataSourceInner { + data: Vec<User>, +} + +impl Debug for CustomDataSource { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.write_str("custom_db") + } +} + +impl CustomDataSource { + pub(crate) async fn create_physical_plan( + &self, + projections: Option<&Vec<usize>>, + schema: SchemaRef, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(Arc::new(CustomExec::new(projections, schema, self.clone()))) + } + + pub(crate) fn populate_users(&self) { + self.add_user(User { + id: 1, + bank_account: 9_000, + }); + self.add_user(User { + id: 2, + bank_account: 100, + }); + self.add_user(User { + id: 3, + bank_account: 1_000, + }); + } + + fn add_user(&self, user: User) { + let mut inner = self.inner.lock().unwrap(); + inner.data.push(user); + } +} + +impl Default for CustomDataSource { + fn default() -> Self { + CustomDataSource { + inner: Arc::new(Mutex::new(CustomDataSourceInner { + data: Default::default(), + })), + metadata_columns: Arc::new(Schema::new(vec![Field::new( + "_rowid", + DataType::UInt64, + false, + )])), + } + } +} + +#[async_trait] +impl TableProvider for CustomDataSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + SchemaRef::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, false), + Field::new("bank_account", DataType::UInt64, true), + ])) + } + + fn metadata_columns(&self) -> Option<SchemaRef> { + Some(self.metadata_columns.clone()) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec<usize>>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option<usize>, + ) -> Result<Arc<dyn ExecutionPlan>> { + let mut schema = self.schema(); + let size = schema.fields.len(); + if let Some(metadata) = self.metadata_columns() { + let mut builder = SchemaBuilder::from(schema.as_ref()); + for f in metadata.fields.iter() { + builder.try_merge(f)?; + } + schema = Arc::new(builder.finish()); + } + + let projection = match projection { + Some(projection) => { + let projection = projection + .iter() + .map(|idx| { + if *idx >= METADATA_OFFSET { + *idx - METADATA_OFFSET + size + } else { + *idx + } + }) + .collect_vec(); + Some(projection) + } + None => None, + }; + return self.create_physical_plan(projection.as_ref(), schema).await; + } +} + +#[derive(Debug, Clone)] +struct CustomExec { + db: CustomDataSource, + projected_schema: SchemaRef, + cache: PlanProperties, +} + +impl CustomExec { + fn new( + projections: Option<&Vec<usize>>, + schema: SchemaRef, + db: CustomDataSource, + ) -> Self { + let projected_schema = project_schema(&schema, projections).unwrap(); + let cache = Self::compute_properties(projected_schema.clone()); + Self { + db, + projected_schema, + cache, + } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + let eq_properties = EquivalenceProperties::new(schema); + PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +impl DisplayAs for CustomExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + write!(f, "CustomExec") + } +} + +impl ExecutionPlan for CustomExec { + fn name(&self) -> &'static str { + "CustomExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![] + } + + fn with_new_children( + self: Arc<Self>, + _: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let users: Vec<User> = { + let db = self.db.inner.lock().unwrap(); + db.data.clone() + }; + + let mut id_array = UInt8Builder::with_capacity(users.len()); + let mut account_array = UInt64Builder::with_capacity(users.len()); + let len = users.len() as u64; + + for user in users { + id_array.append_value(user.id); + account_array.append_value(user.bank_account); + } + + let id_array = id_array.finish(); + let account_array = account_array.finish(); + let rowid_array = UInt64Array::from_iter_values(0_u64..len); + + let arrays = self + .projected_schema + .fields + .iter() + .map(|f| match f.name().as_str() { + "_rowid" => Arc::new(rowid_array.clone()) as ArrayRef, + "id" => Arc::new(id_array.clone()) as ArrayRef, + "bank_account" => Arc::new(account_array.clone()) as ArrayRef, + _ => panic!("cannot reach here"), + }) + .collect(); + + Ok(Box::pin(MemoryStream::try_new( + vec![RecordBatch::try_new(self.projected_schema.clone(), arrays)?], + self.schema(), + None, + )?)) + } +} + +#[tokio::test] +async fn select_metadata_column() { + // Verify SessionContext::with_sql_options errors appropriately + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_information_schema(true), + ); + let db = CustomDataSource::default(); + db.populate_users(); + // ctx.sql("CREATE TABLE test (x int)").await.unwrap(); + ctx.register_table("test", Arc::new(db)).unwrap(); + // disallow ddl + let options = SQLOptions::new().with_allow_ddl(false); + + let show_columns = "show columns from test;"; + let df_columns = ctx.sql_with_options(show_columns, options).await.unwrap(); + let all_batchs = df_columns + .select(vec![col("column_name"), col("data_type")]) + .unwrap() + .collect() + .await + .unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + assert_eq!(batch.num_rows(), 2); + let serializer = CsvSerializer::new().with_header(false); Review Comment: To check the results, can you please use `assert_batches_eq` instead of converting to CSV? That is 1. more consistent with the rest of the codebase 2. easier to read 3. easier to update For example: https://github.com/apache/datafusion/blob/167c11e6587e368bf1965a93f61fe9c8952b6313/datafusion/core/tests/sql/select.rs#L69-L95 ########## datafusion/common/src/dfschema.rs: ########## @@ -106,37 +106,175 @@ pub type DFSchemaRef = Arc<DFSchema>; /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { + inner: QualifiedSchema, + /// Stores functional dependencies in the schema. + functional_dependencies: FunctionalDependencies, + /// metadata columns + metadata: Option<QualifiedSchema>, +} + +pub const METADATA_OFFSET: usize = usize::MAX >> 1; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct QualifiedSchema { Review Comment: Please document what this struct is used for ########## datafusion/catalog/src/table.rs: ########## @@ -55,6 +55,11 @@ pub trait TableProvider: Debug + Sync + Send { /// Get a reference to the schema for this table fn schema(&self) -> SchemaRef; + /// Get metadata columns of this table. + fn metadata_columns(&self) -> Option<SchemaRef> { Review Comment: Can you please document this better -- specifically: 1. A link to the prior art (spark metadata columns) 2. A brief summary of what metadata columns are used for and an example (you can copy the content from the spark docs) ########## datafusion/common/src/dfschema.rs: ########## @@ -106,37 +106,175 @@ pub type DFSchemaRef = Arc<DFSchema>; /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { + inner: QualifiedSchema, + /// Stores functional dependencies in the schema. + functional_dependencies: FunctionalDependencies, + /// metadata columns + metadata: Option<QualifiedSchema>, +} + +pub const METADATA_OFFSET: usize = usize::MAX >> 1; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct QualifiedSchema { /// Inner Arrow schema reference. - inner: SchemaRef, + schema: SchemaRef, /// Optional qualifiers for each column in this schema. In the same order as /// the `self.inner.fields()` field_qualifiers: Vec<Option<TableReference>>, - /// Stores functional dependencies in the schema. - functional_dependencies: FunctionalDependencies, +} + +impl QualifiedSchema { + pub fn empty() -> Self { + Self { + schema: Arc::new(Schema::new([])), + field_qualifiers: vec![], + } + } + + pub fn new(schema: SchemaRef, field_qualifiers: Vec<Option<TableReference>>) -> Self { + QualifiedSchema { + schema, + field_qualifiers, + } + } + + pub fn new_with_table(schema: SchemaRef, table_name: &TableReference) -> Self { + let field_qualifiers = schema + .fields() + .iter() + .map(|_| Some(table_name.clone())) + .collect(); + Self::new(schema, field_qualifiers) + } + + pub fn is_empty(&self) -> bool { + self.schema.fields.is_empty() + } + + pub fn len(&self) -> usize { + self.schema.fields.len() + } + + pub fn qualified_fields_with_unqualified_name( + &self, + name: &str, + ) -> Vec<(Option<&TableReference>, &Field)> { + self.iter() + .filter(|(_, field)| field.name() == name) + .map(|(qualifier, field)| (qualifier, field.as_ref())) + .collect() + } + + /// Iterate over the qualifiers and fields in the DFSchema + pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> { + self.field_qualifiers + .iter() + .zip(self.schema.fields().iter()) + .map(|(qualifier, field)| (qualifier.as_ref(), field)) + } + + pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { + self.fields() + .iter() + .filter(|field| field.name() == name) + .map(|f| f.as_ref()) + .collect() + } + + /// Get a list of fields + pub fn fields(&self) -> &Fields { + &self.schema.fields + } + + /// Returns an immutable reference of a specific `Field` instance selected using an + /// offset within the internal `fields` vector + pub fn field(&self, i: usize) -> &Field { + &self.schema.fields[i] + } + + /// Returns an immutable reference of a specific `Field` instance selected using an + /// offset within the internal `fields` vector and its qualifier + pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) { + (self.field_qualifiers[i].as_ref(), self.field(i)) + } + + pub fn field_with_qualified_name( + &self, + qualifier: &TableReference, + name: &str, + ) -> Option<&Field> { + let mut matches = self + .iter() + .filter(|(q, f)| match q { + Some(field_q) => qualifier.resolved_eq(field_q) && f.name() == name, + None => false, + }) + .map(|(_, f)| f.as_ref()); + matches.next() + } + + pub fn index_of_column_by_name( + &self, + qualifier: Option<&TableReference>, + name: &str, + ) -> Option<usize> { + let mut matches = self + .iter() + .enumerate() + .filter(|(_, (q, f))| match (qualifier, q) { + // field to lookup is qualified. + // current field is qualified and not shared between relations, compare both + // qualifier and name. + (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name, + // field to lookup is qualified but current field is unqualified. + (Some(_), None) => false, + // field to lookup is unqualified, no need to compare qualifier + (None, Some(_)) | (None, None) => f.name() == name, + }) + .map(|(idx, _)| idx); + matches.next() + } + + pub fn field_qualifier(&self, i: usize) -> Option<&TableReference> { + self.field_qualifiers[i].as_ref() + } } impl DFSchema { /// Creates an empty `DFSchema` pub fn empty() -> Self { Self { - inner: Arc::new(Schema::new([])), - field_qualifiers: vec![], + inner: QualifiedSchema::empty(), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, } } + pub fn metadata_schema(&self) -> &Option<QualifiedSchema> { Review Comment: Please add documentation -- imagine you are someone using this API and are not familar with metadata_schema or the content of this API. I think you would want a short summary of what this is and then a link to the full details ########## datafusion/core/tests/sql/metadata_columns.rs: ########## @@ -0,0 +1,361 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{self, Debug, Formatter}; +use std::sync::{Arc, Mutex}; + +use arrow::compute::concat_batches; +use arrow_array::{ArrayRef, UInt64Array}; +use arrow_schema::SchemaBuilder; +use async_trait::async_trait; +use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::file_format::csv::CsvSerializer; +use datafusion::datasource::file_format::write::BatchSerializer; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result; +use datafusion::execution::context::TaskContext; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::memory::MemoryStream; +use datafusion::physical_plan::{ + project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, +}; +use datafusion::prelude::*; + +use datafusion::catalog::Session; +use datafusion_common::METADATA_OFFSET; +use itertools::Itertools; + +/// A User, with an id and a bank account Review Comment: This is is actually quite a cool example of using metadata index Eventually I think it would be great to add an example in https://github.com/apache/datafusion/tree/main/datafusion-examples ########## datafusion/common/src/dfschema.rs: ########## @@ -106,37 +106,175 @@ pub type DFSchemaRef = Arc<DFSchema>; /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { + inner: QualifiedSchema, + /// Stores functional dependencies in the schema. + functional_dependencies: FunctionalDependencies, + /// metadata columns Review Comment: Can you provide more documentation here to document what these are (perhaps adding a link to the higher level description you write on `TableProvider::metadata_columns`) ########## datafusion/common/src/dfschema.rs: ########## @@ -106,37 +106,175 @@ pub type DFSchemaRef = Arc<DFSchema>; /// ``` #[derive(Debug, Clone, PartialEq, Eq)] pub struct DFSchema { + inner: QualifiedSchema, + /// Stores functional dependencies in the schema. + functional_dependencies: FunctionalDependencies, + /// metadata columns + metadata: Option<QualifiedSchema>, +} + +pub const METADATA_OFFSET: usize = usize::MAX >> 1; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct QualifiedSchema { /// Inner Arrow schema reference. - inner: SchemaRef, + schema: SchemaRef, /// Optional qualifiers for each column in this schema. In the same order as /// the `self.inner.fields()` field_qualifiers: Vec<Option<TableReference>>, - /// Stores functional dependencies in the schema. - functional_dependencies: FunctionalDependencies, +} + +impl QualifiedSchema { + pub fn empty() -> Self { + Self { + schema: Arc::new(Schema::new([])), + field_qualifiers: vec![], + } + } + + pub fn new(schema: SchemaRef, field_qualifiers: Vec<Option<TableReference>>) -> Self { + QualifiedSchema { + schema, + field_qualifiers, + } + } + + pub fn new_with_table(schema: SchemaRef, table_name: &TableReference) -> Self { + let field_qualifiers = schema + .fields() + .iter() + .map(|_| Some(table_name.clone())) + .collect(); + Self::new(schema, field_qualifiers) + } + + pub fn is_empty(&self) -> bool { + self.schema.fields.is_empty() + } + + pub fn len(&self) -> usize { + self.schema.fields.len() + } + + pub fn qualified_fields_with_unqualified_name( + &self, + name: &str, + ) -> Vec<(Option<&TableReference>, &Field)> { + self.iter() + .filter(|(_, field)| field.name() == name) + .map(|(qualifier, field)| (qualifier, field.as_ref())) + .collect() + } + + /// Iterate over the qualifiers and fields in the DFSchema + pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> { + self.field_qualifiers + .iter() + .zip(self.schema.fields().iter()) + .map(|(qualifier, field)| (qualifier.as_ref(), field)) + } + + pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> { + self.fields() + .iter() + .filter(|field| field.name() == name) + .map(|f| f.as_ref()) + .collect() + } + + /// Get a list of fields + pub fn fields(&self) -> &Fields { + &self.schema.fields + } + + /// Returns an immutable reference of a specific `Field` instance selected using an + /// offset within the internal `fields` vector + pub fn field(&self, i: usize) -> &Field { + &self.schema.fields[i] + } + + /// Returns an immutable reference of a specific `Field` instance selected using an + /// offset within the internal `fields` vector and its qualifier + pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) { + (self.field_qualifiers[i].as_ref(), self.field(i)) + } + + pub fn field_with_qualified_name( + &self, + qualifier: &TableReference, + name: &str, + ) -> Option<&Field> { + let mut matches = self + .iter() + .filter(|(q, f)| match q { + Some(field_q) => qualifier.resolved_eq(field_q) && f.name() == name, + None => false, + }) + .map(|(_, f)| f.as_ref()); + matches.next() + } + + pub fn index_of_column_by_name( + &self, + qualifier: Option<&TableReference>, + name: &str, + ) -> Option<usize> { + let mut matches = self + .iter() + .enumerate() + .filter(|(_, (q, f))| match (qualifier, q) { + // field to lookup is qualified. + // current field is qualified and not shared between relations, compare both + // qualifier and name. + (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name, + // field to lookup is qualified but current field is unqualified. + (Some(_), None) => false, + // field to lookup is unqualified, no need to compare qualifier + (None, Some(_)) | (None, None) => f.name() == name, + }) + .map(|(idx, _)| idx); + matches.next() + } + + pub fn field_qualifier(&self, i: usize) -> Option<&TableReference> { + self.field_qualifiers[i].as_ref() + } } impl DFSchema { /// Creates an empty `DFSchema` pub fn empty() -> Self { Self { - inner: Arc::new(Schema::new([])), - field_qualifiers: vec![], + inner: QualifiedSchema::empty(), functional_dependencies: FunctionalDependencies::empty(), + metadata: None, } } + pub fn metadata_schema(&self) -> &Option<QualifiedSchema> { + &self.metadata + } + /// Return a reference to the inner Arrow [`Schema`] /// /// Note this does not have the qualifier information pub fn as_arrow(&self) -> &Schema { - self.inner.as_ref() + self.inner.schema.as_ref() } /// Return a reference to the inner Arrow [`SchemaRef`] /// /// Note this does not have the qualifier information pub fn inner(&self) -> &SchemaRef { - &self.inner + &self.inner.schema + } + + pub fn with_metadata_schema( Review Comment: Can we please document these APIs ########## datafusion/core/tests/sql/metadata_columns.rs: ########## @@ -0,0 +1,361 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::fmt::{self, Debug, Formatter}; +use std::sync::{Arc, Mutex}; + +use arrow::compute::concat_batches; +use arrow_array::{ArrayRef, UInt64Array}; +use arrow_schema::SchemaBuilder; +use async_trait::async_trait; +use datafusion::arrow::array::{UInt64Builder, UInt8Builder}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::file_format::csv::CsvSerializer; +use datafusion::datasource::file_format::write::BatchSerializer; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result; +use datafusion::execution::context::TaskContext; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::memory::MemoryStream; +use datafusion::physical_plan::{ + project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, +}; +use datafusion::prelude::*; + +use datafusion::catalog::Session; +use datafusion_common::METADATA_OFFSET; +use itertools::Itertools; + +/// A User, with an id and a bank account +#[derive(Clone, Debug)] +struct User { + id: u8, + bank_account: u64, +} + +/// A custom datasource, used to represent a datastore with a single index +#[derive(Clone)] +pub struct CustomDataSource { + inner: Arc<Mutex<CustomDataSourceInner>>, + metadata_columns: SchemaRef, +} + +struct CustomDataSourceInner { + data: Vec<User>, +} + +impl Debug for CustomDataSource { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.write_str("custom_db") + } +} + +impl CustomDataSource { + pub(crate) async fn create_physical_plan( + &self, + projections: Option<&Vec<usize>>, + schema: SchemaRef, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(Arc::new(CustomExec::new(projections, schema, self.clone()))) + } + + pub(crate) fn populate_users(&self) { + self.add_user(User { + id: 1, + bank_account: 9_000, + }); + self.add_user(User { + id: 2, + bank_account: 100, + }); + self.add_user(User { + id: 3, + bank_account: 1_000, + }); + } + + fn add_user(&self, user: User) { + let mut inner = self.inner.lock().unwrap(); + inner.data.push(user); + } +} + +impl Default for CustomDataSource { + fn default() -> Self { + CustomDataSource { + inner: Arc::new(Mutex::new(CustomDataSourceInner { + data: Default::default(), + })), + metadata_columns: Arc::new(Schema::new(vec![Field::new( + "_rowid", + DataType::UInt64, + false, + )])), + } + } +} + +#[async_trait] +impl TableProvider for CustomDataSource { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + SchemaRef::new(Schema::new(vec![ + Field::new("id", DataType::UInt8, false), + Field::new("bank_account", DataType::UInt64, true), + ])) + } + + fn metadata_columns(&self) -> Option<SchemaRef> { + Some(self.metadata_columns.clone()) + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec<usize>>, + // filters and limit can be used here to inject some push-down operations if needed + _filters: &[Expr], + _limit: Option<usize>, + ) -> Result<Arc<dyn ExecutionPlan>> { + let mut schema = self.schema(); + let size = schema.fields.len(); + if let Some(metadata) = self.metadata_columns() { + let mut builder = SchemaBuilder::from(schema.as_ref()); + for f in metadata.fields.iter() { + builder.try_merge(f)?; + } + schema = Arc::new(builder.finish()); + } + + let projection = match projection { + Some(projection) => { + let projection = projection + .iter() + .map(|idx| { + if *idx >= METADATA_OFFSET { + *idx - METADATA_OFFSET + size + } else { + *idx + } + }) + .collect_vec(); + Some(projection) + } + None => None, + }; + return self.create_physical_plan(projection.as_ref(), schema).await; + } +} + +#[derive(Debug, Clone)] +struct CustomExec { + db: CustomDataSource, + projected_schema: SchemaRef, + cache: PlanProperties, +} + +impl CustomExec { + fn new( + projections: Option<&Vec<usize>>, + schema: SchemaRef, + db: CustomDataSource, + ) -> Self { + let projected_schema = project_schema(&schema, projections).unwrap(); + let cache = Self::compute_properties(projected_schema.clone()); + Self { + db, + projected_schema, + cache, + } + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties(schema: SchemaRef) -> PlanProperties { + let eq_properties = EquivalenceProperties::new(schema); + PlanProperties::new( + eq_properties, + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +impl DisplayAs for CustomExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result { + write!(f, "CustomExec") + } +} + +impl ExecutionPlan for CustomExec { + fn name(&self) -> &'static str { + "CustomExec" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + vec![] + } + + fn with_new_children( + self: Arc<Self>, + _: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + let users: Vec<User> = { + let db = self.db.inner.lock().unwrap(); + db.data.clone() + }; + + let mut id_array = UInt8Builder::with_capacity(users.len()); + let mut account_array = UInt64Builder::with_capacity(users.len()); + let len = users.len() as u64; + + for user in users { + id_array.append_value(user.id); + account_array.append_value(user.bank_account); + } + + let id_array = id_array.finish(); + let account_array = account_array.finish(); + let rowid_array = UInt64Array::from_iter_values(0_u64..len); + + let arrays = self + .projected_schema + .fields + .iter() + .map(|f| match f.name().as_str() { + "_rowid" => Arc::new(rowid_array.clone()) as ArrayRef, + "id" => Arc::new(id_array.clone()) as ArrayRef, + "bank_account" => Arc::new(account_array.clone()) as ArrayRef, + _ => panic!("cannot reach here"), + }) + .collect(); + + Ok(Box::pin(MemoryStream::try_new( + vec![RecordBatch::try_new(self.projected_schema.clone(), arrays)?], + self.schema(), + None, + )?)) + } +} + +#[tokio::test] +async fn select_metadata_column() { + // Verify SessionContext::with_sql_options errors appropriately + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_information_schema(true), + ); + let db = CustomDataSource::default(); + db.populate_users(); + // ctx.sql("CREATE TABLE test (x int)").await.unwrap(); + ctx.register_table("test", Arc::new(db)).unwrap(); + // disallow ddl + let options = SQLOptions::new().with_allow_ddl(false); + + let show_columns = "show columns from test;"; + let df_columns = ctx.sql_with_options(show_columns, options).await.unwrap(); + let all_batchs = df_columns + .select(vec![col("column_name"), col("data_type")]) + .unwrap() + .collect() + .await + .unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + assert_eq!(batch.num_rows(), 2); + let serializer = CsvSerializer::new().with_header(false); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "id,UInt8\nbank_account,UInt64\n"); + let select0 = "SELECT * FROM test order by id"; + let df0 = ctx.sql_with_options(select0, options).await.unwrap(); + assert!(!df0.schema().has_column_with_unqualified_name("_rowid")); + + let all_batchs = df0.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "1,9000\n2,100\n3,1000\n"); + + let select1 = "SELECT _rowid FROM test order by _rowid"; + let df1 = ctx.sql_with_options(select1, options).await.unwrap(); + assert_eq!(df1.schema().field_names(), vec!["test._rowid"]); + + let all_batchs = df1.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "0\n1\n2\n"); + + let select2 = "SELECT _rowid, id FROM test order by _rowid"; + let df2 = ctx.sql_with_options(select2, options).await.unwrap(); + assert_eq!(df2.schema().field_names(), vec!["test._rowid", "test.id"]); + + let all_batchs = df2.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "0,1\n1,2\n2,3\n"); + + let select3 = "SELECT _rowid, id FROM test WHERE _rowid = 0"; + let df3 = ctx.sql_with_options(select3, options).await.unwrap(); + assert_eq!(df3.schema().field_names(), vec!["test._rowid", "test.id"]); + + let all_batchs = df3.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "0,1\n"); + + let select4 = "SELECT _rowid FROM test LIMIT 1"; + let df4 = ctx.sql_with_options(select4, options).await.unwrap(); + assert_eq!(df4.schema().field_names(), vec!["test._rowid"]); + + let all_batchs = df4.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "0\n"); + + let select5 = "SELECT _rowid, id FROM test WHERE _rowid % 2 = 1"; + let df5 = ctx.sql_with_options(select5, options).await.unwrap(); + assert_eq!(df5.schema().field_names(), vec!["test._rowid", "test.id"]); + + let all_batchs = df5.collect().await.unwrap(); + let batch = concat_batches(&all_batchs[0].schema(), &all_batchs).unwrap(); + let bytes = serializer.serialize(batch, true).unwrap(); + assert_eq!(bytes, "1,2\n"); Review Comment: Can we please also add a test for more than one metadata column? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org