westonpace commented on code in PR #13800: URL: https://github.com/apache/datafusion/pull/13800#discussion_r1893016360
########## datafusion/catalog/src/async.rs: ########## @@ -0,0 +1,765 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion_common::{error::Result, not_impl_err, HashMap, TableReference}; +use datafusion_execution::config::SessionConfig; + +use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; + +/// A schema provider that looks up tables in a cache +/// +/// Instances are created by the [`AsyncSchemaProvider::resolve`] method +#[derive(Debug)] +struct ResolvedSchemaProvider { + owner_name: Option<String>, + cached_tables: HashMap<String, Arc<dyn TableProvider>>, +} +#[async_trait] +impl SchemaProvider for ResolvedSchemaProvider { + fn owner_name(&self) -> Option<&str> { + self.owner_name.as_deref() + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn table_names(&self) -> Vec<String> { + self.cached_tables.keys().cloned().collect() + } + + async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> { + Ok(self.cached_tables.get(name).cloned()) + } + + fn register_table( + &self, + name: String, + _table: Arc<dyn TableProvider>, + ) -> Result<Option<Arc<dyn TableProvider>>> { + not_impl_err!( + "Attempt to register table '{name}' with ResolvedSchemaProvider which is not supported" + ) + } + + fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> { + not_impl_err!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported") + } + + fn table_exist(&self, name: &str) -> bool { + self.cached_tables.contains_key(name) + } +} + +/// Helper class for building a [`ResolvedSchemaProvider`] +struct ResolvedSchemaProviderBuilder { + owner_name: String, + async_provider: Arc<dyn AsyncSchemaProvider>, + cached_tables: HashMap<String, Option<Arc<dyn TableProvider>>>, +} +impl ResolvedSchemaProviderBuilder { + fn new(owner_name: String, async_provider: Arc<dyn AsyncSchemaProvider>) -> Self { + Self { + owner_name, + async_provider, + cached_tables: HashMap::new(), + } + } + + async fn resolve_table(&mut self, table_name: &str) -> Result<()> { + if !self.cached_tables.contains_key(table_name) { + let resolved_table = self.async_provider.table(table_name).await?; + self.cached_tables + .insert(table_name.to_string(), resolved_table); + } + Ok(()) + } + + fn finish(self) -> Arc<dyn SchemaProvider> { + let cached_tables = self + .cached_tables + .into_iter() + .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, value))) + .collect(); + Arc::new(ResolvedSchemaProvider { + owner_name: Some(self.owner_name), + cached_tables, + }) + } +} + +/// A catalog provider that looks up schemas in a cache +/// +/// Instances are created by the [`AsyncCatalogProvider::resolve`] method +#[derive(Debug)] +struct ResolvedCatalogProvider { + cached_schemas: HashMap<String, Arc<dyn SchemaProvider>>, +} +impl CatalogProvider for ResolvedCatalogProvider { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema_names(&self) -> Vec<String> { + self.cached_schemas.keys().cloned().collect() + } + + fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> { + self.cached_schemas.get(name).cloned() + } +} + +/// Helper class for building a [`ResolvedCatalogProvider`] +struct ResolvedCatalogProviderBuilder { + cached_schemas: HashMap<String, Option<ResolvedSchemaProviderBuilder>>, + async_provider: Arc<dyn AsyncCatalogProvider>, +} +impl ResolvedCatalogProviderBuilder { + fn new(async_provider: Arc<dyn AsyncCatalogProvider>) -> Self { + Self { + cached_schemas: HashMap::new(), + async_provider, + } + } + fn finish(self) -> Arc<dyn CatalogProvider> { + let cached_schemas = self + .cached_schemas + .into_iter() + .filter_map(|(key, maybe_value)| { + maybe_value.map(|value| (key, value.finish())) + }) + .collect(); + Arc::new(ResolvedCatalogProvider { cached_schemas }) + } +} + +/// A catalog provider list that looks up catalogs in a cache +/// +/// Instances are created by the [`AsyncCatalogProviderList::resolve`] method +#[derive(Debug)] +struct ResolvedCatalogProviderList { + cached_catalogs: HashMap<String, Arc<dyn CatalogProvider>>, +} +impl CatalogProviderList for ResolvedCatalogProviderList { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn register_catalog( + &self, + _name: String, + _catalog: Arc<dyn CatalogProvider>, + ) -> Option<Arc<dyn CatalogProvider>> { + unimplemented!("resolved providers cannot handle registration APIs") + } + + fn catalog_names(&self) -> Vec<String> { + self.cached_catalogs.keys().cloned().collect() + } + + fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> { + self.cached_catalogs.get(name).cloned() + } +} + +/// A trait for schema providers that must resolve tables asynchronously +/// +/// The [`SchemaProvider::table`] method _is_ asynchronous. However, this is primarily for convenience and +/// it is not a good idea for this method to be slow as this will cause poor planning performance. +/// +/// It is a better idea to resolve the tables once and cache them in memory for the duration of +/// planning. This trait helps implement that pattern. +/// +/// After implementing this trait you can call the [`AsyncSchemaProvider::resolve`] method to get an +/// `Arc<dyn SchemaProvider>` that contains a cached copy of the referenced tables. The `resolve` +/// method can be slow and asynchronous as it is only called once, before planning. +#[async_trait] +pub trait AsyncSchemaProvider: Send + Sync { + /// Return the name of the schema provided by this provider + /// + /// If a table reference's schema name does not match this name then the reference will be ignored + /// when calculating the cached set of tables (this allows other providers to supply the table) + fn name(&self) -> &str; + /// Return the name of the catalog this provider belongs to + /// + /// If a table reference's catalog name does not match this name then the reference will be ignored + /// when calculating the cached set of tables (this allows other providers to supply the table) + fn catalog_name(&self) -> &str; Review Comment: Yeah, I was a bit torn on this anyways. The problem arises when the user wants to use an AsyncSchemaProvider or AsyncCatalogProvider as the top level catalog. In these cases it isn't clear what we should do with full / partial table references. For example, if the user adds an AsyncSchemaProvider and then tries to resolve the query "SELECT * FROM weston.public.my_table" what should we do? * We could just assume that all table references are intended for us. This works as long as this schema provider is the only provider registered. If there are multiple providers registered then we need to know which to use for a given table reference somehow. * We could assume we don't match the table reference and we will only match bare references. * Or we can require schema providers to supply their own name and the catalog name so that we can filter references that apply (this is what I do) The main problem with the current approach is that users whose top level is an AsyncCatalogProviderList have to implement these methods even though they are meaningless (we will do the filtering in the higher level resolve function). We should probably do whatever the synchronous planner does in this case but I just didn't know. If I register a schema provider with a `SessionContext` and then call `sql` with a full (not bare) table reference then does it apply the provider I registered or not? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
