tespent commented on issue #1103:
URL: 
https://github.com/apache/datafusion-python/issues/1103#issuecomment-2799841561

   @timsaucer This is wonderful! However, I think FFI CatalogProvider is not 
enough for my needs, since I'm looking for *pure python-written* 
CatalogProvider and SchemaProvider. In my case, I need to wrap python-based 
data sources into a CatalogProvider, which still requires an extra non-python 
layer, even after we have the FFI interface (since python basically cannot 
expose C interface so it is excluded in the supported languages for FFI). 
   
   The critical point for me is to have some python logic to describe catalog, 
schema and tables. And I suppose this will requires much additional work after 
FFI-based interface. Let me show some conceptional code for this:
   
   ```python
   class MyCatalogProvider(CatalogProvider):
       def __init__(self, **client_kwargs):
           self._client = ...
   
       def schema_names(self) -> list[str]:
           return self._client.list_databases(name_only=True)
   
       def schema(self, name: str) -> SchemaProvider | None:
           try:
               db = self._client.get_database(name)
           except FileNotFoundException:
               return None
           return MySchemaProvider(self._client, db)
   
   
   class MySchemaProvider(SchemaProvider):
       def __init__(self, client: SomeClient, database: MyDatabaseInClient):
           self._client = client
           self._database = database
   
       def table_names(self) -> list[str]:
           return self._client.list_tables(self._database, name_only=True)
   
       def table(self, name: str) -> TableProvider | None:
           try:
               tbl = self._client.get_table(self._database, name)
           except FileNotFoundException:
               return None
           return MyTableProvider(self._client, self._database, tbl)
   
       def table_exists(self, name: str) -> bool:
           return self._client.table_exists(self._database, name)
   
   
   class MyTableProvider(TableProvider):
       def __init__(self, client: SomeClient, schema: MyDatabaseInClient, 
table: MyTableInClient):
           self._client = client
           self._table = table
   
       def schema(self) -> pyarrow.Schema:
           return self._table.schema
   
       def scan(self, session: SessionInfo, projection: list[int] | None, 
filters: list[datafusion.Expr], limit: int | None) -> ExecutionPlan:
           plan = self._client.plan_read(self._table)
           ... # logics to plan the execution
           return MyPythonExec(plan, data_schema)
   
       def supports_filters_pushdown(self, filters: list[datafusion.Expr]) -> 
list[TableProviderFilterPushDown]:
           ....
   
   
   class MyPythonExec(ExecutionPlan):
       def __init__(self, ...):
           ...
   
       def __repr__(self) -> str:
           ...
   
       @property
       def schema(self) -> pyarrow.Schema:
           ...
   
       @property
       def num_partitions(self) -> int:
           ...
   
       def partition(self, id: int) -> Iterable[pyarrow.RecordBatch]:
           ...
   
   
   ctx = SessionContext()
   ctx.register_catalog("mine", MyCatalogProvider(...))
   print(ctx.sql("select a,b from mine.db.tbl where a>10 and b < 20")) # 
preview the result for sql
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to