Hi Dani, Plugins need to be placed in a folder inside the plugins directory, I think that might be the problem.
Best regards, Martijn On Sun, Jul 9, 2023 at 7:00 PM Dániel Pálma <dani...@gmail.com> wrote: > Thanks for the tips Martijn! > > I've fixed the library versions to 1.16 everywhere and also decided to > scrap pyflink and go for the sql-client instead to keep things simpler for > now. > > This is the Dockerfile I am using for both the *jobmanager* and the > *sql-client* > > FROM flink:1.16.2-scala_2.12-java11 > > RUN APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/ \ > && HADOOP_VERSION=3.3.5 \ > && wget > ${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz > \ > && tar xzvf hadoop-${HADOOP_VERSION}.tar.gz \ > && HADOOP_HOME=`pwd`/hadoop-${HADOOP_VERSION} > > ENV HADOOP_CLASSPATH /opt/flink/hadoop-3.3.5/etc/hadoop:/opt/flink/ > hadoop-3.3.5/share/hadoop/common/lib/*:/opt/flink/hadoop-3.3.5/share/ > hadoop/common/*:/opt/flink/hadoop-3.3.5/share/hadoop/hdfs:/opt/flink/ > hadoop-3.3.5/share/hadoop/hdfs/lib/*:/opt/flink/hadoop-3.3.5/share/hadoop/ > hdfs/*:/opt/flink/hadoop-3.3.5/share/hadoop/mapreduce/*:/opt/flink/ > hadoop-3.3.5/share/hadoop/yarn:/opt/flink/hadoop-3.3.5/share/hadoop/yarn/ > lib/*:/opt/flink/hadoop-3.3.5/share/hadoop/yarn/* > > COPY lib/flink-json-1.16.1.jar /opt/flink/lib/ > COPY lib/flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar /opt/flink/lib/ > COPY lib/flink-sql-connector-kafka-1.16.2.jar /opt/flink/lib/ > COPY lib/iceberg-flink-runtime-1.16-1.3.0.jar /opt/flink/lib/ > COPY lib/iceberg-hive-runtime-1.3.0.jar /opt/flink/lib/ > COPY lib/hive-metastore-3.1.3.jar /opt/flink/lib/ > COPY lib/hadoop-aws-3.3.5.jar /opt/flink/lib/ > COPY lib/aws-java-sdk-bundle-1.12.316.jar /opt/flink/lib/ > > COPY lib/flink-s3-fs-hadoop-1.16.1.jar /opt/flink/plugins/ > > WORKDIR /opt/flink > > I start the sql-client via */opt/flink/bin/sql-client.sh embedded* > > I am able to create a sink table with the iceberg connector using the > following query: > > create table if not exists clicks_ib > ( > `timestamp` STRING, > event STRING, > user_id STRING, > site_id STRING, > url STRING, > on_site_seconds INT, > viewed_percent INT > ) > with ( 'connector'='iceberg', > 'catalog-name'='hive_catalog', > 'uri'='thrift://hivemetastore:9083', > 'warehouse'='s3a://iceberg'); > > But when I try to select from it, I run into the following error: > > *Flink SQL> select * from default_catalog.default_database.clicks_ib;* > > > *[ERROR] Could not execute SQL statement. > Reason:org.apache.hadoop.hive.metastore.api.MetaException: > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class > org.apache.hadoop.fs.s3a.S3AFileSystem not found* > > I feel like there is still a little confusion in me on where to place what > jars, but not exactly sure what is missing. > > For reference, I'll paste the current full docker-compose.yml below. > > version: "3.7" > services: > > sqlclient: > container_name: sqlclient > build: . > command: > - /opt/flink/bin/sql-client.sh > - embedded > depends_on: > - jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > rest.address: jobmanager > volumes: > - ./flink-sql:/etc/sql > > jobmanager: > build: . > hostname: "jobmanager" > container_name: "jobmanager" > expose: > - "6123" > ports: > - "8081:8081" > command: jobmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=jobmanager > - AWS_ACCESS_KEY_ID=minio > - AWS_SECRET_ACCESS_KEY=minio123 > - AWS_REGION=us-east-1 > > taskmanager: > image: flink:1.16.2-scala_2.12-java11 > hostname: "taskmanager" > container_name: "taskmanager" > expose: > - "6121" > - "6122" > depends_on: > - jobmanager > command: taskmanager > links: > - jobmanager:jobmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=jobmanager > - AWS_ACCESS_KEY_ID=minio > - AWS_SECRET_ACCESS_KEY=minio123 > - AWS_REGION=us-east-1 > > mariadb: > image: 'mariadb:latest' > hostname: mariadb > container_name: mariadb > ports: > - '3306:3306' > environment: > MYSQL_ROOT_PASSWORD: admin > MYSQL_USER: admin > MYSQL_PASSWORD: admin > MYSQL_DATABASE: metastore_db > volumes: > - ./mariadb-data:/var/lib/mysql > > hivemetastore: > hostname: hivemetastore > container_name: hivemetastore > build: > context: hive > ports: > - '9083:9083' > environment: > METASTORE_DB_HOSTNAME: mariadb > depends_on: > - mariadb > > minio: > hostname: "minio" > image: "minio/minio:latest" > container_name: "minio" > ports: > - "9001:9001" > - "9000:9000" > command: > - "server" > - "/data" > - "--console-address" > - ":9001" > volumes: > - "minio:/data" > environment: > MINIO_ROOT_USER: "minio" > MINIO_ROOT_PASSWORD: "minio123" > networks: > default: > aliases: > - iceberg.minio > > mc: > depends_on: > - "minio" > image: "minio/mc" > container_name: "mc" > entrypoint: > > /bin/sh -c " > until (/usr/bin/mc config host add minio http://minio:9000 minio > minio123) do echo "...waiting..." && sleep 1; done; > /usr/bin/mc rm -r --force minio/iceberg; > /usr/bin/mc mb minio/iceberg; > /usr/bin/mc policy set public minio/iceberg; > tail -f /dev/null > " > > broker: > image: confluentinc/cp-kafka:7.4.0 > hostname: broker > container_name: broker > depends_on: > - controller > ports: > - "9092:9092" > - "9101:9101" > environment: > KAFKA_NODE_ID: 1 > KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: > 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' > KAFKA_ADVERTISED_LISTENERS: > 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' > KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 > KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 > KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 > KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 > KAFKA_JMX_PORT: 9101 > KAFKA_JMX_HOSTNAME: localhost > KAFKA_PROCESS_ROLES: 'broker' > KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093' > KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092' > KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' > KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' > KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' > # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh > random-uuid" > # See > https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh > CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' > > controller: > image: confluentinc/cp-kafka:7.4.0 > hostname: controller > container_name: controller > ports: > - "9093:9093" > - "9102:9102" > environment: > KAFKA_NODE_ID: 2 > KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 > KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 > KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 > KAFKA_JMX_PORT: 9102 > KAFKA_JMX_HOSTNAME: localhost > KAFKA_PROCESS_ROLES: 'controller' > KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093' > KAFKA_LISTENERS: 'CONTROLLER://controller:9093' > KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' > KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' > KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs' > # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh > random-uuid" > # See > https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh > CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' > > # schema-registry: > # image: confluentinc/cp-schema-registry:7.4.0 > # hostname: schema-registry > # container_name: schema-registry > # depends_on: > # - broker > # - controller > # ports: > # - "8081:8081" > # environment: > # SCHEMA_REGISTRY_HOST_NAME: schema-registry > # SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' > # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 > > control-center: > image: confluentinc/cp-enterprise-control-center:7.4.0 > hostname: control-center > container_name: control-center > depends_on: > - broker > - controller > # - schema-registry > ports: > - "9021:9021" > environment: > CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' > # CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" > CONTROL_CENTER_REPLICATION_FACTOR: 1 > CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 > CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 > CONFLUENT_METRICS_TOPIC_REPLICATION: 1 > PORT: 9021 > > volumes: > minio: null > > networks: > default: > name: flinkberg > > Best, > > Dani > > On Thu, Jun 29, 2023 at 9:06 AM Martijn Visser <martijnvis...@apache.org> > wrote: > >> Hi Dani, >> >> There are two things that I notice: >> >> 1. You're mixing different Flink versions (1.16 and 1.17): all Flink >> artifacts should be from the same Flink version >> 2. S3 plugins need to be added to the plugins folder of Flink, because >> they are loaded via the plugin mechanism. See >> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/ >> >> Best regards, >> >> Martijn >> >> On Sat, Jun 24, 2023 at 1:22 PM Dániel Pálma <dani...@gmail.com> wrote: >> >>> Hey folks, >>> >>> Nice to meet ya'll! >>> >>> I'm trying to get the following stack up and running locally: >>> >>> - Kafka as source >>> - pyFlink SQL >>> - Iceberg on top of MinIO >>> >>> The goal is to have a pyflink script that reads data from a Kafka topic, >>> does some transformations, and dumps it into an iceberg table. >>> >>> I have everything, except for the pyflink app running in Docker, defined >>> in a docker-compose.yml: >>> >>> version: "3.7" >>> services: >>> >>> mariadb: >>> image: 'mariadb:latest' >>> hostname: mariadb >>> container_name: mariadb >>> ports: >>> - '3306:3306' >>> environment: >>> MYSQL_ROOT_PASSWORD: admin >>> MYSQL_USER: admin >>> MYSQL_PASSWORD: admin >>> MYSQL_DATABASE: metastore_db >>> volumes: >>> - ./mariadb-data:/var/lib/mysql >>> networks: >>> iceberg_net: >>> >>> hive-metastore: >>> hostname: hive-metastore >>> container_name: hive-metastore >>> build: >>> context: hive >>> ports: >>> - '9083:9083' >>> environment: >>> METASTORE_DB_HOSTNAME: mariadb >>> depends_on: >>> - mariadb >>> networks: >>> iceberg_net: >>> >>> minio: >>> hostname: "minio" >>> image: "minio/minio:latest" >>> container_name: "minio" >>> ports: >>> - "9001:9001" >>> - "9000:9000" >>> command: >>> - "server" >>> - "/data" >>> - "--console-address" >>> - ":9001" >>> volumes: >>> - "minio:/data" >>> environment: >>> MINIO_ROOT_USER: "minio" >>> MINIO_ROOT_PASSWORD: "minio123" >>> networks: >>> iceberg_net: >>> aliases: >>> - iceberg.minio >>> >>> mc: >>> depends_on: >>> - "minio" >>> image: "minio/mc" >>> container_name: "mc" >>> entrypoint: > >>> /bin/sh -c " >>> until (/usr/bin/mc config host add minio http://minio:9000 minio >>> minio123) do echo "...waiting..." && sleep 1; done; >>> /usr/bin/mc rm -r --force minio/iceberg; >>> /usr/bin/mc mb minio/iceberg; >>> /usr/bin/mc policy set public minio/iceberg; >>> tail -f /dev/null >>> " >>> networks: >>> iceberg_net: >>> >>> broker: >>> image: confluentinc/cp-kafka:7.4.0 >>> hostname: broker >>> container_name: broker >>> depends_on: >>> - controller >>> ports: >>> - "9092:9092" >>> - "9101:9101" >>> environment: >>> KAFKA_NODE_ID: 1 >>> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >>> 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' >>> KAFKA_ADVERTISED_LISTENERS: >>> 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' >>> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 >>> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 >>> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 >>> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 >>> KAFKA_JMX_PORT: 9101 >>> KAFKA_JMX_HOSTNAME: localhost >>> KAFKA_PROCESS_ROLES: 'broker' >>> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093' >>> KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092 >>> ' >>> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' >>> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' >>> KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' >>> # Replace CLUSTER_ID with a unique base64 UUID using >>> "bin/kafka-storage.sh random-uuid" >>> # See >>> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh >>> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' >>> networks: >>> iceberg_net: >>> >>> controller: >>> image: confluentinc/cp-kafka:7.4.0 >>> hostname: controller >>> container_name: controller >>> ports: >>> - "9093:9093" >>> - "9102:9102" >>> environment: >>> KAFKA_NODE_ID: 2 >>> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 >>> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 >>> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 >>> KAFKA_JMX_PORT: 9102 >>> KAFKA_JMX_HOSTNAME: localhost >>> KAFKA_PROCESS_ROLES: 'controller' >>> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093' >>> KAFKA_LISTENERS: 'CONTROLLER://controller:9093' >>> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' >>> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' >>> KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs' >>> # Replace CLUSTER_ID with a unique base64 UUID using >>> "bin/kafka-storage.sh random-uuid" >>> # See >>> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh >>> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' >>> networks: >>> iceberg_net: >>> >>> schema-registry: >>> image: confluentinc/cp-schema-registry:7.4.0 >>> hostname: schema-registry >>> container_name: schema-registry >>> depends_on: >>> - broker >>> - controller >>> ports: >>> - "8081:8081" >>> environment: >>> SCHEMA_REGISTRY_HOST_NAME: schema-registry >>> SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' >>> SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 >>> networks: >>> iceberg_net: >>> >>> control-center: >>> image: confluentinc/cp-enterprise-control-center:7.4.0 >>> hostname: control-center >>> container_name: control-center >>> depends_on: >>> - broker >>> - controller >>> - schema-registry >>> ports: >>> - "9021:9021" >>> environment: >>> CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' >>> CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" >>> CONTROL_CENTER_REPLICATION_FACTOR: 1 >>> CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 >>> CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 >>> CONFLUENT_METRICS_TOPIC_REPLICATION: 1 >>> PORT: 9021 >>> networks: >>> iceberg_net: >>> >>> networks: >>> iceberg_net: >>> >>> volumes: >>> minio: null >>> >>> >>> My PyFlink program looks like this currently: >>> >>> def demo(): >>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.set_parallelism(1) >>> env.enable_checkpointing(1000) >>> >>> for jar in FLINK_JARS: >>> print(f"Adding jar: {jar}") >>> env.add_jars(f"file://{jar}") >>> >>> t_env = StreamTableEnvironment.create(stream_execution_environment=env) >>> t_env.execute_sql(""" >>> CREATE CATALOG hive_catalog WITH ( >>> 'type'='iceberg', >>> 'catalog-type'='hive', >>> 'uri'='thrift://localhost:9083', >>> 'warehouse'='s3a://iceberg' >>> ) >>> """) >>> >>> t_env.use_catalog("hive_catalog") >>> >>> t_env.execute_sql(""" >>> create database if not exists iceberg >>> """) >>> >>> >>> These jars are added to the streaming environment: >>> >>> - aws-java-sdk-bundle-1.12.316.jar >>> - flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar hive-metastore-3.1.3.jar >>> - flink-s3-fs-hadoop-1.16.1.jar >>> - flink-sql-connector-kafka-1.16.2.jar >>> - iceberg-flink-runtime-1.16-1.3.0.jar >>> - flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar hadoop-aws-3.3.5.jar >>> - iceberg-hive-runtime-1.3.0.jar >>> >>> The metastore operates with the following versions: >>> >>> HADOOP_VERSION=3.3.5 >>> METASTORE_VERSION=3.1.3 >>> >>> And finally, this is the error I am currently running into: >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> *pyflink.util.exceptions.TableException: >>> org.apache.flink.table.api.TableException: Could not execute CREATE >>> DATABASE: (catalogDatabase: [{}], catalogName: [hive_catalog], >>> databaseName: [iceberg], ignoreIfExists: [true]) at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1125) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) >>> at >>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) >>> at >>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) >>> at >>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) >>> at >>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) >>> at >>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >>> at java.base/java.lang.Thread.run(Thread.java:829)Caused by: >>> java.lang.RuntimeException: Failed to create namespace iceberg in Hive >>> Metastore at >>> org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:294) >>> at >>> org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:230) >>> at >>> org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:221) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1117) >>> ... 12 moreCaused by: >>> MetaException(message:java.lang.RuntimeException: >>> java.lang.ClassNotFoundException: Class >>> org.apache.hadoop.fs.s3a.S3AFileSystem not found) at >>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343) >>> at >>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311) >>> at >>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245) >>> at >>> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86) >>> at >>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106) >>> at >>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093) >>> at >>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811) >>> at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>> Method) at >>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>> at >>> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208) >>> at com.sun.proxy.$Proxy36.createDatabase(Unknown Source) at >>> org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283) >>> at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58) >>> at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51) >>> at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122) >>> at >>> org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:281) >>> ... 15 more* >>> >>> I also have the HADOOP classpath added to the environment before running >>> the script via: *export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop >>> classpath`* >>> I'm not well versed in Java but as far as I understand including the >>> `hadoop-aws` jar in the flink runtime should provide the necessary classes >>> to run this example, right? >>> >>> I was looking for any pointers on where to go from here, or maybe >>> existing examples of similar setups (I couldn't find any). >>> >>> Best, >>> >>> Dani >>> >>