tespent commented on issue #1103: URL: https://github.com/apache/datafusion-python/issues/1103#issuecomment-2799841561
@timsaucer This is wonderful! However, I think FFI CatalogProvider is not enough for my needs, since I'm looking for *pure python-written* CatalogProvider and SchemaProvider. In my case, I need to wrap python-based data sources into a CatalogProvider, which still requires an extra non-python layer, even after we have the FFI interface (since python basically cannot expose C interface so it is excluded in the supported languages for FFI). The critical point for me is to have some python logic to describe catalog, schema and tables. And I suppose this will requires much additional work after FFI-based interface. Let me show some conceptional code for this: ```python class MyCatalogProvider(CatalogProvider): def __init__(self, **client_kwargs): self._client = ... def schema_names(self) -> list[str]: return self._client.list_databases(name_only=True) def schema(self, name: str) -> SchemaProvider | None: try: db = self._client.get_database(name) except FileNotFoundException: return None return MySchemaProvider(self._client, db) class MySchemaProvider(SchemaProvider): def __init__(self, client: SomeClient, database: MyDatabaseInClient): self._client = client self._database = database def table_names(self) -> list[str]: return self._client.list_tables(self._database, name_only=True) def table(self, name: str) -> TableProvider | None: try: tbl = self._client.get_table(self._database, name) except FileNotFoundException: return None return MyTableProvider(self._client, self._database, tbl) def table_exists(self, name: str) -> bool: return self._client.table_exists(self._database, name) class MyTableProvider(TableProvider): def __init__(self, client: SomeClient, schema: MyDatabaseInClient, table: MyTableInClient): self._client = client self._table = table def schema(self) -> pyarrow.Schema: return self._table.schema def scan(self, session: SessionInfo, projection: list[int] | None, filters: list[datafusion.Expr], limit: int | None) -> ExecutionPlan: plan = self._client.plan_read(self._table) ... # logics to plan the execution return MyPythonExec(plan, data_schema) def supports_filters_pushdown(self, filters: list[datafusion.Expr]) -> list[TableProviderFilterPushDown]: .... class MyPythonExec(ExecutionPlan): def __init__(self, ...): ... def __repr__(self) -> str: ... @property def schema(self) -> pyarrow.Schema: ... @property def num_partitions(self) -> int: ... def partition(self, id: int) -> Iterable[pyarrow.RecordBatch]: ... ctx = SessionContext() ctx.register_catalog("mine", MyCatalogProvider(...)) print(ctx.sql("select a,b from mine.db.tbl where a>10 and b < 20")) # preview the result for sql ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org