alamb commented on code in PR #13800:
URL: https://github.com/apache/datafusion/pull/13800#discussion_r1905333290


##########
datafusion/catalog/src/async.rs:
##########
@@ -0,0 +1,747 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion_common::{error::Result, not_impl_err, HashMap, TableReference};
+use datafusion_execution::config::SessionConfig;
+
+use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, 
TableProvider};
+
+/// A schema provider that looks up tables in a cache
+///
+/// Instances are created by the [`AsyncSchemaProvider::resolve`] method
+#[derive(Debug)]
+struct ResolvedSchemaProvider {
+    owner_name: Option<String>,
+    cached_tables: HashMap<String, Arc<dyn TableProvider>>,
+}
+#[async_trait]
+impl SchemaProvider for ResolvedSchemaProvider {
+    fn owner_name(&self) -> Option<&str> {
+        self.owner_name.as_deref()
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn table_names(&self) -> Vec<String> {
+        self.cached_tables.keys().cloned().collect()
+    }
+
+    async fn table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
+        Ok(self.cached_tables.get(name).cloned())
+    }
+
+    fn register_table(
+        &self,
+        name: String,
+        _table: Arc<dyn TableProvider>,
+    ) -> Result<Option<Arc<dyn TableProvider>>> {
+        not_impl_err!(
+            "Attempt to register table '{name}' with ResolvedSchemaProvider 
which is not supported"
+        )
+    }
+
+    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
+        not_impl_err!("Attempt to deregister table '{name}' with 
ResolvedSchemaProvider which is not supported")
+    }
+
+    fn table_exist(&self, name: &str) -> bool {
+        self.cached_tables.contains_key(name)
+    }
+}
+
+/// Helper class for building a [`ResolvedSchemaProvider`]
+struct ResolvedSchemaProviderBuilder {
+    owner_name: String,
+    async_provider: Arc<dyn AsyncSchemaProvider>,
+    cached_tables: HashMap<String, Option<Arc<dyn TableProvider>>>,
+}
+impl ResolvedSchemaProviderBuilder {
+    fn new(owner_name: String, async_provider: Arc<dyn AsyncSchemaProvider>) 
-> Self {
+        Self {
+            owner_name,
+            async_provider,
+            cached_tables: HashMap::new(),
+        }
+    }
+
+    async fn resolve_table(&mut self, table_name: &str) -> Result<()> {
+        if !self.cached_tables.contains_key(table_name) {
+            let resolved_table = self.async_provider.table(table_name).await?;
+            self.cached_tables
+                .insert(table_name.to_string(), resolved_table);
+        }
+        Ok(())
+    }
+
+    fn finish(self) -> Arc<dyn SchemaProvider> {
+        let cached_tables = self
+            .cached_tables
+            .into_iter()
+            .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, 
value)))
+            .collect();
+        Arc::new(ResolvedSchemaProvider {
+            owner_name: Some(self.owner_name),
+            cached_tables,
+        })
+    }
+}
+
+/// A catalog provider that looks up schemas in a cache
+///
+/// Instances are created by the [`AsyncCatalogProvider::resolve`] method
+#[derive(Debug)]
+struct ResolvedCatalogProvider {
+    cached_schemas: HashMap<String, Arc<dyn SchemaProvider>>,
+}
+impl CatalogProvider for ResolvedCatalogProvider {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema_names(&self) -> Vec<String> {
+        self.cached_schemas.keys().cloned().collect()
+    }
+
+    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
+        self.cached_schemas.get(name).cloned()
+    }
+}
+
+/// Helper class for building a [`ResolvedCatalogProvider`]
+struct ResolvedCatalogProviderBuilder {
+    cached_schemas: HashMap<String, Option<ResolvedSchemaProviderBuilder>>,
+    async_provider: Arc<dyn AsyncCatalogProvider>,
+}
+impl ResolvedCatalogProviderBuilder {
+    fn new(async_provider: Arc<dyn AsyncCatalogProvider>) -> Self {
+        Self {
+            cached_schemas: HashMap::new(),
+            async_provider,
+        }
+    }
+    fn finish(self) -> Arc<dyn CatalogProvider> {
+        let cached_schemas = self
+            .cached_schemas
+            .into_iter()
+            .filter_map(|(key, maybe_value)| {
+                maybe_value.map(|value| (key, value.finish()))
+            })
+            .collect();
+        Arc::new(ResolvedCatalogProvider { cached_schemas })
+    }
+}
+
+/// A catalog provider list that looks up catalogs in a cache
+///
+/// Instances are created by the [`AsyncCatalogProviderList::resolve`] method
+#[derive(Debug)]
+struct ResolvedCatalogProviderList {
+    cached_catalogs: HashMap<String, Arc<dyn CatalogProvider>>,
+}
+impl CatalogProviderList for ResolvedCatalogProviderList {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn register_catalog(
+        &self,
+        _name: String,
+        _catalog: Arc<dyn CatalogProvider>,
+    ) -> Option<Arc<dyn CatalogProvider>> {
+        unimplemented!("resolved providers cannot handle registration APIs")
+    }
+
+    fn catalog_names(&self) -> Vec<String> {
+        self.cached_catalogs.keys().cloned().collect()
+    }
+
+    fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
+        self.cached_catalogs.get(name).cloned()
+    }
+}
+
+/// A trait for schema providers that must resolve tables asynchronously
+///
+/// The [`SchemaProvider::table`] method _is_ asynchronous.  However, this is 
primarily for convenience and
+/// it is not a good idea for this method to be slow as this will cause poor 
planning performance.
+///
+/// It is a better idea to resolve the tables once and cache them in memory 
for the duration of
+/// planning.  This trait helps implement that pattern.
+///
+/// After implementing this trait you can call the 
[`AsyncSchemaProvider::resolve`] method to get an
+/// `Arc<dyn SchemaProvider>` that contains a cached copy of the referenced 
tables.  The `resolve`
+/// method can be slow and asynchronous as it is only called once, before 
planning.

Review Comment:
   maybe we can add a reference to the example, like this (can be done as a 
follow on PR as well)
   
   ```suggestion
   /// method can be slow and asynchronous as it is only called once, before 
planning.
   ///
   /// See the [remote_catalog.rs] for a full end to end example
   ///
   /// [remote_catalog.rs]: 
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/remote_catalog.rs
   ```



##########
datafusion-examples/examples/remote_catalog.rs:
##########
@@ -91,27 +83,26 @@ async fn main() -> Result<()> {
     // `remote_schema.remote_table`)
     let references = state.resolve_table_references(&statement)?;
 
-    // Call `load_tables` to load information from the remote catalog for each
-    // of the referenced tables. Best practice is to fetch the the information
-    // for all tables required by the query once (rather than one per table) to
-    // minimize network overhead
-    let table_names = references.iter().filter_map(|r| {
-        if refers_to_schema("datafusion", "remote_schema", r) {
-            Some(r.table())
-        } else {
-            None
-        }
-    });
-    remote_schema
-        .as_any()
-        .downcast_ref::<RemoteSchema>()
-        .expect("correct types")
-        .load_tables(table_names)
+    // Now we can asynchronously resolve the table references to get a cached 
catalog
+    // that we can use for our query
+    let resolved_catalog = remote_catalog_adapter
+        .resolve(&references, state.config(), "datafusion", "remote_schema")
         .await?;
 
-    // Now continue planing the query after having fetched the remote table and
-    // it can run as normal
-    let plan = state.statement_to_plan(statement).await?;
+    // This resolved catalog only makes sense for this query and so we create 
a clone

Review Comment:
   this is a nice API 👍 



##########
datafusion/catalog/src/async.rs:
##########
@@ -0,0 +1,764 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion_common::{
+    error::{DataFusionError, Result},
+    HashMap, TableReference,
+};
+use datafusion_execution::config::SessionConfig;
+
+use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, 
TableProvider};
+
+/// A schema provider that looks up tables in a cache
+///
+/// This is created by the [`AsyncSchemaProvider::resolve`] method
+#[derive(Debug)]
+struct ResolvedSchemaProvider {
+    owner_name: Option<String>,
+    cached_tables: HashMap<String, Arc<dyn TableProvider>>,
+}
+#[async_trait]
+impl SchemaProvider for ResolvedSchemaProvider {
+    fn owner_name(&self) -> Option<&str> {
+        self.owner_name.as_deref()
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn table_names(&self) -> Vec<String> {
+        self.cached_tables.keys().cloned().collect()
+    }
+
+    async fn table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
+        Ok(self.cached_tables.get(name).cloned())
+    }
+
+    #[allow(unused_variables)]
+    fn register_table(
+        &self,
+        name: String,
+        _table: Arc<dyn TableProvider>,
+    ) -> Result<Option<Arc<dyn TableProvider>>> {
+        Err(DataFusionError::Execution(format!(
+            "Attempt to register table '{name}' with ResolvedSchemaProvider 
which is not supported"
+        )))
+    }
+
+    #[allow(unused_variables)]
+    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
+        Err(DataFusionError::Execution(format!("Attempt to deregister table 
'{name}' with ResolvedSchemaProvider which is not supported")))
+    }
+
+    fn table_exist(&self, name: &str) -> bool {

Review Comment:
   Maybe we can consider a rename in a future separate PR , but I agree we 
shouldn't mess with the `SchemaProvider` trait in this PR



##########
datafusion/catalog/src/async.rs:
##########
@@ -0,0 +1,747 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion_common::{error::Result, not_impl_err, HashMap, TableReference};
+use datafusion_execution::config::SessionConfig;
+
+use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, 
TableProvider};
+
+/// A schema provider that looks up tables in a cache
+///
+/// Instances are created by the [`AsyncSchemaProvider::resolve`] method
+#[derive(Debug)]
+struct ResolvedSchemaProvider {
+    owner_name: Option<String>,
+    cached_tables: HashMap<String, Arc<dyn TableProvider>>,
+}
+#[async_trait]
+impl SchemaProvider for ResolvedSchemaProvider {
+    fn owner_name(&self) -> Option<&str> {
+        self.owner_name.as_deref()
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn table_names(&self) -> Vec<String> {
+        self.cached_tables.keys().cloned().collect()
+    }
+
+    async fn table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
+        Ok(self.cached_tables.get(name).cloned())
+    }
+
+    fn register_table(
+        &self,
+        name: String,
+        _table: Arc<dyn TableProvider>,
+    ) -> Result<Option<Arc<dyn TableProvider>>> {
+        not_impl_err!(
+            "Attempt to register table '{name}' with ResolvedSchemaProvider 
which is not supported"
+        )
+    }
+
+    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
+        not_impl_err!("Attempt to deregister table '{name}' with 
ResolvedSchemaProvider which is not supported")
+    }
+
+    fn table_exist(&self, name: &str) -> bool {
+        self.cached_tables.contains_key(name)
+    }
+}
+
+/// Helper class for building a [`ResolvedSchemaProvider`]
+struct ResolvedSchemaProviderBuilder {
+    owner_name: String,
+    async_provider: Arc<dyn AsyncSchemaProvider>,
+    cached_tables: HashMap<String, Option<Arc<dyn TableProvider>>>,
+}
+impl ResolvedSchemaProviderBuilder {
+    fn new(owner_name: String, async_provider: Arc<dyn AsyncSchemaProvider>) 
-> Self {
+        Self {
+            owner_name,
+            async_provider,
+            cached_tables: HashMap::new(),
+        }
+    }
+
+    async fn resolve_table(&mut self, table_name: &str) -> Result<()> {
+        if !self.cached_tables.contains_key(table_name) {
+            let resolved_table = self.async_provider.table(table_name).await?;
+            self.cached_tables
+                .insert(table_name.to_string(), resolved_table);
+        }
+        Ok(())
+    }
+
+    fn finish(self) -> Arc<dyn SchemaProvider> {
+        let cached_tables = self
+            .cached_tables
+            .into_iter()
+            .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, 
value)))
+            .collect();
+        Arc::new(ResolvedSchemaProvider {
+            owner_name: Some(self.owner_name),
+            cached_tables,
+        })
+    }
+}
+
+/// A catalog provider that looks up schemas in a cache
+///
+/// Instances are created by the [`AsyncCatalogProvider::resolve`] method
+#[derive(Debug)]
+struct ResolvedCatalogProvider {
+    cached_schemas: HashMap<String, Arc<dyn SchemaProvider>>,
+}
+impl CatalogProvider for ResolvedCatalogProvider {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema_names(&self) -> Vec<String> {
+        self.cached_schemas.keys().cloned().collect()
+    }
+
+    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
+        self.cached_schemas.get(name).cloned()
+    }
+}
+
+/// Helper class for building a [`ResolvedCatalogProvider`]
+struct ResolvedCatalogProviderBuilder {
+    cached_schemas: HashMap<String, Option<ResolvedSchemaProviderBuilder>>,
+    async_provider: Arc<dyn AsyncCatalogProvider>,
+}
+impl ResolvedCatalogProviderBuilder {
+    fn new(async_provider: Arc<dyn AsyncCatalogProvider>) -> Self {
+        Self {
+            cached_schemas: HashMap::new(),
+            async_provider,
+        }
+    }
+    fn finish(self) -> Arc<dyn CatalogProvider> {
+        let cached_schemas = self
+            .cached_schemas
+            .into_iter()
+            .filter_map(|(key, maybe_value)| {
+                maybe_value.map(|value| (key, value.finish()))
+            })
+            .collect();
+        Arc::new(ResolvedCatalogProvider { cached_schemas })
+    }
+}
+
+/// A catalog provider list that looks up catalogs in a cache
+///
+/// Instances are created by the [`AsyncCatalogProviderList::resolve`] method
+#[derive(Debug)]
+struct ResolvedCatalogProviderList {
+    cached_catalogs: HashMap<String, Arc<dyn CatalogProvider>>,
+}
+impl CatalogProviderList for ResolvedCatalogProviderList {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn register_catalog(
+        &self,
+        _name: String,
+        _catalog: Arc<dyn CatalogProvider>,
+    ) -> Option<Arc<dyn CatalogProvider>> {
+        unimplemented!("resolved providers cannot handle registration APIs")
+    }
+
+    fn catalog_names(&self) -> Vec<String> {
+        self.cached_catalogs.keys().cloned().collect()
+    }
+
+    fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
+        self.cached_catalogs.get(name).cloned()
+    }
+}
+
+/// A trait for schema providers that must resolve tables asynchronously
+///
+/// The [`SchemaProvider::table`] method _is_ asynchronous.  However, this is 
primarily for convenience and
+/// it is not a good idea for this method to be slow as this will cause poor 
planning performance.
+///
+/// It is a better idea to resolve the tables once and cache them in memory 
for the duration of
+/// planning.  This trait helps implement that pattern.
+///
+/// After implementing this trait you can call the 
[`AsyncSchemaProvider::resolve`] method to get an
+/// `Arc<dyn SchemaProvider>` that contains a cached copy of the referenced 
tables.  The `resolve`
+/// method can be slow and asynchronous as it is only called once, before 
planning.
+#[async_trait]
+pub trait AsyncSchemaProvider: Send + Sync {
+    /// Lookup a table in the schema provider
+    async fn table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>>;
+    /// Creates a cached provider that can be used to execute a query 
containing given references
+    ///
+    /// This method will walk through the references and look them up once, 
creating a cache of table
+    /// providers.  This cache will be returned as a synchronous TableProvider 
that can be used to plan
+    /// and execute a query containing the given references.
+    ///
+    /// This cache is intended to be short-lived for the execution of a single 
query.  There is no mechanism
+    /// for refresh or eviction of stale entries.

Review Comment:
   maybe we can add a reference to the example, like this (can be done as a 
follow on PR as well)
   
   ```suggestion
       /// for refresh or eviction of stale entries.
       ////
       /// See the [remote_catalog.rs] for a full end to end example
       ///
       /// [remote_catalog.rs]: 
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/remote_catalog.rs
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to