Thanks for the tips Martijn! I've fixed the library versions to 1.16 everywhere and also decided to scrap pyflink and go for the sql-client instead to keep things simpler for now.
This is the Dockerfile I am using for both the *jobmanager* and the *sql-client* FROM flink:1.16.2-scala_2.12-java11 RUN APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/ \ && HADOOP_VERSION=3.3.5 \ && wget ${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz \ && tar xzvf hadoop-${HADOOP_VERSION}.tar.gz \ && HADOOP_HOME=`pwd`/hadoop-${HADOOP_VERSION} ENV HADOOP_CLASSPATH /opt/flink/hadoop-3.3.5/etc/hadoop:/opt/flink/hadoop-3 .3.5/share/hadoop/common/lib/*:/opt/flink/hadoop-3.3.5/share/hadoop/common/* :/opt/flink/hadoop-3.3.5/share/hadoop/hdfs:/opt/flink/hadoop-3.3.5/share/ hadoop/hdfs/lib/*:/opt/flink/hadoop-3.3.5/share/hadoop/hdfs/*:/opt/flink/ hadoop-3.3.5/share/hadoop/mapreduce/*:/opt/flink/hadoop-3.3.5/share/hadoop/ yarn:/opt/flink/hadoop-3.3.5/share/hadoop/yarn/lib/*:/opt/flink/hadoop-3.3.5 /share/hadoop/yarn/* COPY lib/flink-json-1.16.1.jar /opt/flink/lib/ COPY lib/flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar /opt/flink/lib/ COPY lib/flink-sql-connector-kafka-1.16.2.jar /opt/flink/lib/ COPY lib/iceberg-flink-runtime-1.16-1.3.0.jar /opt/flink/lib/ COPY lib/iceberg-hive-runtime-1.3.0.jar /opt/flink/lib/ COPY lib/hive-metastore-3.1.3.jar /opt/flink/lib/ COPY lib/hadoop-aws-3.3.5.jar /opt/flink/lib/ COPY lib/aws-java-sdk-bundle-1.12.316.jar /opt/flink/lib/ COPY lib/flink-s3-fs-hadoop-1.16.1.jar /opt/flink/plugins/ WORKDIR /opt/flink I start the sql-client via */opt/flink/bin/sql-client.sh embedded* I am able to create a sink table with the iceberg connector using the following query: create table if not exists clicks_ib ( `timestamp` STRING, event STRING, user_id STRING, site_id STRING, url STRING, on_site_seconds INT, viewed_percent INT ) with ( 'connector'='iceberg', 'catalog-name'='hive_catalog', 'uri'='thrift://hivemetastore:9083', 'warehouse'='s3a://iceberg'); But when I try to select from it, I run into the following error: *Flink SQL> select * from default_catalog.default_database.clicks_ib;* *[ERROR] Could not execute SQL statement. Reason:org.apache.hadoop.hive.metastore.api.MetaException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found* I feel like there is still a little confusion in me on where to place what jars, but not exactly sure what is missing. For reference, I'll paste the current full docker-compose.yml below. version: "3.7" services: sqlclient: container_name: sqlclient build: . command: - /opt/flink/bin/sql-client.sh - embedded depends_on: - jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager rest.address: jobmanager volumes: - ./flink-sql:/etc/sql jobmanager: build: . hostname: "jobmanager" container_name: "jobmanager" expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager - AWS_ACCESS_KEY_ID=minio - AWS_SECRET_ACCESS_KEY=minio123 - AWS_REGION=us-east-1 taskmanager: image: flink:1.16.2-scala_2.12-java11 hostname: "taskmanager" container_name: "taskmanager" expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - jobmanager:jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager - AWS_ACCESS_KEY_ID=minio - AWS_SECRET_ACCESS_KEY=minio123 - AWS_REGION=us-east-1 mariadb: image: 'mariadb:latest' hostname: mariadb container_name: mariadb ports: - '3306:3306' environment: MYSQL_ROOT_PASSWORD: admin MYSQL_USER: admin MYSQL_PASSWORD: admin MYSQL_DATABASE: metastore_db volumes: - ./mariadb-data:/var/lib/mysql hivemetastore: hostname: hivemetastore container_name: hivemetastore build: context: hive ports: - '9083:9083' environment: METASTORE_DB_HOSTNAME: mariadb depends_on: - mariadb minio: hostname: "minio" image: "minio/minio:latest" container_name: "minio" ports: - "9001:9001" - "9000:9000" command: - "server" - "/data" - "--console-address" - ":9001" volumes: - "minio:/data" environment: MINIO_ROOT_USER: "minio" MINIO_ROOT_PASSWORD: "minio123" networks: default: aliases: - iceberg.minio mc: depends_on: - "minio" image: "minio/mc" container_name: "mc" entrypoint: > /bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo "...waiting..." && sleep 1; done; /usr/bin/mc rm -r --force minio/iceberg; /usr/bin/mc mb minio/iceberg; /usr/bin/mc policy set public minio/iceberg; tail -f /dev/null " broker: image: confluentinc/cp-kafka:7.4.0 hostname: broker container_name: broker depends_on: - controller ports: - "9092:9092" - "9101:9101" environment: KAFKA_NODE_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_PROCESS_ROLES: 'broker' KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093' KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092' KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' controller: image: confluentinc/cp-kafka:7.4.0 hostname: controller container_name: controller ports: - "9093:9093" - "9102:9102" environment: KAFKA_NODE_ID: 2 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9102 KAFKA_JMX_HOSTNAME: localhost KAFKA_PROCESS_ROLES: 'controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093' KAFKA_LISTENERS: 'CONTROLLER://controller:9093' KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs' # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' # schema-registry: # image: confluentinc/cp-schema-registry:7.4.0 # hostname: schema-registry # container_name: schema-registry # depends_on: # - broker # - controller # ports: # - "8081:8081" # environment: # SCHEMA_REGISTRY_HOST_NAME: schema-registry # SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 control-center: image: confluentinc/cp-enterprise-control-center:7.4.0 hostname: control-center container_name: control-center depends_on: - broker - controller # - schema-registry ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' # CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021 volumes: minio: null networks: default: name: flinkberg Best, Dani On Thu, Jun 29, 2023 at 9:06 AM Martijn Visser <martijnvis...@apache.org> wrote: > Hi Dani, > > There are two things that I notice: > > 1. You're mixing different Flink versions (1.16 and 1.17): all Flink > artifacts should be from the same Flink version > 2. S3 plugins need to be added to the plugins folder of Flink, because > they are loaded via the plugin mechanism. See > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/ > > Best regards, > > Martijn > > On Sat, Jun 24, 2023 at 1:22 PM Dániel Pálma <dani...@gmail.com> wrote: > >> Hey folks, >> >> Nice to meet ya'll! >> >> I'm trying to get the following stack up and running locally: >> >> - Kafka as source >> - pyFlink SQL >> - Iceberg on top of MinIO >> >> The goal is to have a pyflink script that reads data from a Kafka topic, >> does some transformations, and dumps it into an iceberg table. >> >> I have everything, except for the pyflink app running in Docker, defined >> in a docker-compose.yml: >> >> version: "3.7" >> services: >> >> mariadb: >> image: 'mariadb:latest' >> hostname: mariadb >> container_name: mariadb >> ports: >> - '3306:3306' >> environment: >> MYSQL_ROOT_PASSWORD: admin >> MYSQL_USER: admin >> MYSQL_PASSWORD: admin >> MYSQL_DATABASE: metastore_db >> volumes: >> - ./mariadb-data:/var/lib/mysql >> networks: >> iceberg_net: >> >> hive-metastore: >> hostname: hive-metastore >> container_name: hive-metastore >> build: >> context: hive >> ports: >> - '9083:9083' >> environment: >> METASTORE_DB_HOSTNAME: mariadb >> depends_on: >> - mariadb >> networks: >> iceberg_net: >> >> minio: >> hostname: "minio" >> image: "minio/minio:latest" >> container_name: "minio" >> ports: >> - "9001:9001" >> - "9000:9000" >> command: >> - "server" >> - "/data" >> - "--console-address" >> - ":9001" >> volumes: >> - "minio:/data" >> environment: >> MINIO_ROOT_USER: "minio" >> MINIO_ROOT_PASSWORD: "minio123" >> networks: >> iceberg_net: >> aliases: >> - iceberg.minio >> >> mc: >> depends_on: >> - "minio" >> image: "minio/mc" >> container_name: "mc" >> entrypoint: > >> /bin/sh -c " >> until (/usr/bin/mc config host add minio http://minio:9000 minio >> minio123) do echo "...waiting..." && sleep 1; done; >> /usr/bin/mc rm -r --force minio/iceberg; >> /usr/bin/mc mb minio/iceberg; >> /usr/bin/mc policy set public minio/iceberg; >> tail -f /dev/null >> " >> networks: >> iceberg_net: >> >> broker: >> image: confluentinc/cp-kafka:7.4.0 >> hostname: broker >> container_name: broker >> depends_on: >> - controller >> ports: >> - "9092:9092" >> - "9101:9101" >> environment: >> KAFKA_NODE_ID: 1 >> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >> 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' >> KAFKA_ADVERTISED_LISTENERS: >> 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' >> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 >> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 >> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 >> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 >> KAFKA_JMX_PORT: 9101 >> KAFKA_JMX_HOSTNAME: localhost >> KAFKA_PROCESS_ROLES: 'broker' >> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093' >> KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092' >> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' >> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' >> KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' >> # Replace CLUSTER_ID with a unique base64 UUID using >> "bin/kafka-storage.sh random-uuid" >> # See >> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh >> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' >> networks: >> iceberg_net: >> >> controller: >> image: confluentinc/cp-kafka:7.4.0 >> hostname: controller >> container_name: controller >> ports: >> - "9093:9093" >> - "9102:9102" >> environment: >> KAFKA_NODE_ID: 2 >> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 >> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 >> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 >> KAFKA_JMX_PORT: 9102 >> KAFKA_JMX_HOSTNAME: localhost >> KAFKA_PROCESS_ROLES: 'controller' >> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093' >> KAFKA_LISTENERS: 'CONTROLLER://controller:9093' >> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' >> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' >> KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs' >> # Replace CLUSTER_ID with a unique base64 UUID using >> "bin/kafka-storage.sh random-uuid" >> # See >> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh >> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' >> networks: >> iceberg_net: >> >> schema-registry: >> image: confluentinc/cp-schema-registry:7.4.0 >> hostname: schema-registry >> container_name: schema-registry >> depends_on: >> - broker >> - controller >> ports: >> - "8081:8081" >> environment: >> SCHEMA_REGISTRY_HOST_NAME: schema-registry >> SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' >> SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 >> networks: >> iceberg_net: >> >> control-center: >> image: confluentinc/cp-enterprise-control-center:7.4.0 >> hostname: control-center >> container_name: control-center >> depends_on: >> - broker >> - controller >> - schema-registry >> ports: >> - "9021:9021" >> environment: >> CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' >> CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" >> CONTROL_CENTER_REPLICATION_FACTOR: 1 >> CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 >> CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 >> CONFLUENT_METRICS_TOPIC_REPLICATION: 1 >> PORT: 9021 >> networks: >> iceberg_net: >> >> networks: >> iceberg_net: >> >> volumes: >> minio: null >> >> >> My PyFlink program looks like this currently: >> >> def demo(): >> env = StreamExecutionEnvironment.get_execution_environment() >> env.set_parallelism(1) >> env.enable_checkpointing(1000) >> >> for jar in FLINK_JARS: >> print(f"Adding jar: {jar}") >> env.add_jars(f"file://{jar}") >> >> t_env = StreamTableEnvironment.create(stream_execution_environment=env) >> t_env.execute_sql(""" >> CREATE CATALOG hive_catalog WITH ( >> 'type'='iceberg', >> 'catalog-type'='hive', >> 'uri'='thrift://localhost:9083', >> 'warehouse'='s3a://iceberg' >> ) >> """) >> >> t_env.use_catalog("hive_catalog") >> >> t_env.execute_sql(""" >> create database if not exists iceberg >> """) >> >> >> These jars are added to the streaming environment: >> >> - aws-java-sdk-bundle-1.12.316.jar >> - flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar hive-metastore-3.1.3.jar >> - flink-s3-fs-hadoop-1.16.1.jar >> - flink-sql-connector-kafka-1.16.2.jar >> - iceberg-flink-runtime-1.16-1.3.0.jar >> - flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar hadoop-aws-3.3.5.jar >> - iceberg-hive-runtime-1.3.0.jar >> >> The metastore operates with the following versions: >> >> HADOOP_VERSION=3.3.5 >> METASTORE_VERSION=3.1.3 >> >> And finally, this is the error I am currently running into: >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *pyflink.util.exceptions.TableException: >> org.apache.flink.table.api.TableException: Could not execute CREATE >> DATABASE: (catalogDatabase: [{}], catalogName: [hive_catalog], >> databaseName: [iceberg], ignoreIfExists: [true]) at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1125) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) >> at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >> at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >> at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >> at >> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >> at >> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >> at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> at java.base/java.lang.Thread.run(Thread.java:829)Caused by: >> java.lang.RuntimeException: Failed to create namespace iceberg in Hive >> Metastore at >> org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:294) >> at >> org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:230) >> at >> org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:221) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1117) >> ... 12 moreCaused by: >> MetaException(message:java.lang.RuntimeException: >> java.lang.ClassNotFoundException: Class >> org.apache.hadoop.fs.s3a.S3AFileSystem not found) at >> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343) >> at >> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311) >> at >> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245) >> at >> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86) >> at >> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106) >> at >> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093) >> at >> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811) >> at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >> at >> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208) >> at com.sun.proxy.$Proxy36.createDatabase(Unknown Source) at >> org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283) >> at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58) >> at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51) >> at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122) >> at >> org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:281) >> ... 15 more* >> >> I also have the HADOOP classpath added to the environment before running >> the script via: *export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop >> classpath`* >> I'm not well versed in Java but as far as I understand including the >> `hadoop-aws` jar in the flink runtime should provide the necessary classes >> to run this example, right? >> >> I was looking for any pointers on where to go from here, or maybe >> existing examples of similar setups (I couldn't find any). >> >> Best, >> >> Dani >> >