Hey folks.  I apologize if this isn't the place, but I'm really struggling
to put together a proper config/example that utilizes pyflink and our
organizations managed Iceberg. There are bits and pieces of helpful
examples in the Flink, Iceberg and pyflink docs but nothing I can get to
work with our setup.  Our datalake team uses a Postgress Database for
managing metadata and an S3 compatible store for the files.

I'm struggling both with the overloaded language used in the documentation
and resolving jar file dependencies. Its unclear to me whether I need to
create a catalog in my pyflink runtime and use that in queries, or whether
I can configure the connection in a different way so I can query the
existing catalogs/tables. Its also unclear to me based on the errors I'm
getting, whether I have all the proper jar files in my flink setup for use
with pyflink. In my trial and error with the config, I seem to
oscillate between errors related to missing classes in the underlying java
code, or errors related to not finding the configured catalog/table.

Has anyone on the list used Pyflink together with iceberg and a jdbc
catalog implementation similar to ours?  Anyone know of useful example
pyflink code that does something similar to the below? I'd love to pick
your brain.

env_settings = EnvironmentSettings.in_streaming_mode()

table_env = TableEnvironment.create(environment_settings=env_settings)

table_env.execute_sql(f"""

    CREATE CATALOG flink_iceberg WITH (

        'type'='iceberg',

        'connector'='iceberg',


'catalog-impl'='org.apache.flink.connector.jdbc.catalog.JdbcCatalog',

        'uri'='jdbc:comdb2://my-jdbc-connect-string',

         io-impl'='org.apache.iceberg.aws.s3.S3FileIO',

         'warehouse'='s3a://my-s3-bucket/'

         )

""")


table_env.use_catalog("flink_iceberg")

result = table_env.sql_query("SELECT * FROM prometheus_prometheus")

result.execute().print()

Frank Gilroy

Reply via email to