Hey folks, Nice to meet ya'll!
I'm trying to get the following stack up and running locally: - Kafka as source - pyFlink SQL - Iceberg on top of MinIO The goal is to have a pyflink script that reads data from a Kafka topic, does some transformations, and dumps it into an iceberg table. I have everything, except for the pyflink app running in Docker, defined in a docker-compose.yml: version: "3.7" services: mariadb: image: 'mariadb:latest' hostname: mariadb container_name: mariadb ports: - '3306:3306' environment: MYSQL_ROOT_PASSWORD: admin MYSQL_USER: admin MYSQL_PASSWORD: admin MYSQL_DATABASE: metastore_db volumes: - ./mariadb-data:/var/lib/mysql networks: iceberg_net: hive-metastore: hostname: hive-metastore container_name: hive-metastore build: context: hive ports: - '9083:9083' environment: METASTORE_DB_HOSTNAME: mariadb depends_on: - mariadb networks: iceberg_net: minio: hostname: "minio" image: "minio/minio:latest" container_name: "minio" ports: - "9001:9001" - "9000:9000" command: - "server" - "/data" - "--console-address" - ":9001" volumes: - "minio:/data" environment: MINIO_ROOT_USER: "minio" MINIO_ROOT_PASSWORD: "minio123" networks: iceberg_net: aliases: - iceberg.minio mc: depends_on: - "minio" image: "minio/mc" container_name: "mc" entrypoint: > /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo "...waiting..." && sleep 1; done; /usr/bin/mc rm -r --force minio/iceberg; /usr/bin/mc mb minio/iceberg; /usr/bin/mc policy set public minio/iceberg; tail -f /dev/null " networks: iceberg_net: broker: image: confluentinc/cp-kafka:7.4.0 hostname: broker container_name: broker depends_on: - controller ports: - "9092:9092" - "9101:9101" environment: KAFKA_NODE_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_PROCESS_ROLES: 'broker' KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093' KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092' KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' networks: iceberg_net: controller: image: confluentinc/cp-kafka:7.4.0 hostname: controller container_name: controller ports: - "9093:9093" - "9102:9102" environment: KAFKA_NODE_ID: 2 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9102 KAFKA_JMX_HOSTNAME: localhost KAFKA_PROCESS_ROLES: 'controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093' KAFKA_LISTENERS: 'CONTROLLER://controller:9093' KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs' # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' networks: iceberg_net: schema-registry: image: confluentinc/cp-schema-registry:7.4.0 hostname: schema-registry container_name: schema-registry depends_on: - broker - controller ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 networks: iceberg_net: control-center: image: confluentinc/cp-enterprise-control-center:7.4.0 hostname: control-center container_name: control-center depends_on: - broker - controller - schema-registry ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021 networks: iceberg_net: networks: iceberg_net: volumes: minio: null My PyFlink program looks like this currently: def demo(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) env.enable_checkpointing(1000) for jar in FLINK_JARS: print(f"Adding jar: {jar}") env.add_jars(f"file://{jar}") t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.execute_sql(""" CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://localhost:9083', 'warehouse'='s3a://iceberg' ) """) t_env.use_catalog("hive_catalog") t_env.execute_sql(""" create database if not exists iceberg """) These jars are added to the streaming environment: - aws-java-sdk-bundle-1.12.316.jar - flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar hive-metastore-3.1.3.jar - flink-s3-fs-hadoop-1.16.1.jar - flink-sql-connector-kafka-1.16.2.jar - iceberg-flink-runtime-1.16-1.3.0.jar - flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar hadoop-aws-3.3.5.jar - iceberg-hive-runtime-1.3.0.jar The metastore operates with the following versions: HADOOP_VERSION=3.3.5 METASTORE_VERSION=3.1.3 And finally, this is the error I am currently running into: *pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Could not execute CREATE DATABASE: (catalogDatabase: [{}], catalogName: [hive_catalog], databaseName: [iceberg], ignoreIfExists: [true]) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1125) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829)Caused by: java.lang.RuntimeException: Failed to create namespace iceberg in Hive Metastore at org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:294) at org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:230) at org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:221) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1117) ... 12 moreCaused by: MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208) at com.sun.proxy.$Proxy36.createDatabase(Unknown Source) at org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283) at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58) at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51) at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122) at org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:281) ... 15 more* I also have the HADOOP classpath added to the environment before running the script via: *export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`* I'm not well versed in Java but as far as I understand including the `hadoop-aws` jar in the flink runtime should provide the necessary classes to run this example, right? I was looking for any pointers on where to go from here, or maybe existing examples of similar setups (I couldn't find any). Best, Dani