Hey Frank,

Thanks for reaching out here. I spent some cycles a while ago to remove the
Hadoop requirement from Flink. There were a lot of APIs that needed to
change, which caused not to follow through with it. But this might help you
in getting PyFlink up and running since it contains an example similar to
what you're trying to do: https://github.com/apache/iceberg/pull/7369

Let me know if this helps.

Kind regards,
Fokko

Op di 9 apr 2024 om 20:38 schreef Frank <frankjgil...@gmail.com>:

> Hey folks.  I apologize if this isn't the place, but I'm really struggling
> to put together a proper config/example that utilizes pyflink and our
> organizations managed Iceberg. There are bits and pieces of helpful
> examples in the Flink, Iceberg and pyflink docs but nothing I can get to
> work with our setup.  Our datalake team uses a Postgress Database for
> managing metadata and an S3 compatible store for the files.
>
> I'm struggling both with the overloaded language used in the documentation
> and resolving jar file dependencies. Its unclear to me whether I need to
> create a catalog in my pyflink runtime and use that in queries, or whether
> I can configure the connection in a different way so I can query the
> existing catalogs/tables. Its also unclear to me based on the errors I'm
> getting, whether I have all the proper jar files in my flink setup for use
> with pyflink. In my trial and error with the config, I seem to
> oscillate between errors related to missing classes in the underlying java
> code, or errors related to not finding the configured catalog/table.
>
> Has anyone on the list used Pyflink together with iceberg and a jdbc
> catalog implementation similar to ours?  Anyone know of useful example
> pyflink code that does something similar to the below? I'd love to pick
> your brain.
>
> env_settings = EnvironmentSettings.in_streaming_mode()
>
> table_env = TableEnvironment.create(environment_settings=env_settings)
>
> table_env.execute_sql(f"""
>
>     CREATE CATALOG flink_iceberg WITH (
>
>         'type'='iceberg',
>
>         'connector'='iceberg',
>
>
> 'catalog-impl'='org.apache.flink.connector.jdbc.catalog.JdbcCatalog',
>
>         'uri'='jdbc:comdb2://my-jdbc-connect-string',
>
>          io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
>
>          'warehouse'='s3a://my-s3-bucket/'
>
>          )
>
> """)
>
>
> table_env.use_catalog("flink_iceberg")
>
> result = table_env.sql_query("SELECT * FROM prometheus_prometheus")
>
> result.execute().print()
>
> Frank Gilroy
>
>

Reply via email to