Hey folks. I apologize if this isn't the place, but I'm really struggling to put together a proper config/example that utilizes pyflink and our organizations managed Iceberg. There are bits and pieces of helpful examples in the Flink, Iceberg and pyflink docs but nothing I can get to work with our setup. Our datalake team uses a Postgress Database for managing metadata and an S3 compatible store for the files.
I'm struggling both with the overloaded language used in the documentation and resolving jar file dependencies. Its unclear to me whether I need to create a catalog in my pyflink runtime and use that in queries, or whether I can configure the connection in a different way so I can query the existing catalogs/tables. Its also unclear to me based on the errors I'm getting, whether I have all the proper jar files in my flink setup for use with pyflink. In my trial and error with the config, I seem to oscillate between errors related to missing classes in the underlying java code, or errors related to not finding the configured catalog/table. Has anyone on the list used Pyflink together with iceberg and a jdbc catalog implementation similar to ours? Anyone know of useful example pyflink code that does something similar to the below? I'd love to pick your brain. env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(environment_settings=env_settings) table_env.execute_sql(f""" CREATE CATALOG flink_iceberg WITH ( 'type'='iceberg', 'connector'='iceberg', 'catalog-impl'='org.apache.flink.connector.jdbc.catalog.JdbcCatalog', 'uri'='jdbc:comdb2://my-jdbc-connect-string', io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'warehouse'='s3a://my-s3-bucket/' ) """) table_env.use_catalog("flink_iceberg") result = table_env.sql_query("SELECT * FROM prometheus_prometheus") result.execute().print() Frank Gilroy