Thanks for the tips Martijn!

I've fixed the library versions to 1.16 everywhere and also decided to
scrap pyflink and go for the sql-client instead to keep things simpler for
now.

This is the Dockerfile I am using for both the *jobmanager* and the
*sql-client*

FROM flink:1.16.2-scala_2.12-java11

RUN APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/ \
&& HADOOP_VERSION=3.3.5 \
&& wget 
${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
\
&& tar xzvf hadoop-${HADOOP_VERSION}.tar.gz \
&& HADOOP_HOME=`pwd`/hadoop-${HADOOP_VERSION}

ENV HADOOP_CLASSPATH /opt/flink/hadoop-3.3.5/etc/hadoop:/opt/flink/hadoop-3
.3.5/share/hadoop/common/lib/*:/opt/flink/hadoop-3.3.5/share/hadoop/common/*
:/opt/flink/hadoop-3.3.5/share/hadoop/hdfs:/opt/flink/hadoop-3.3.5/share/
hadoop/hdfs/lib/*:/opt/flink/hadoop-3.3.5/share/hadoop/hdfs/*:/opt/flink/
hadoop-3.3.5/share/hadoop/mapreduce/*:/opt/flink/hadoop-3.3.5/share/hadoop/
yarn:/opt/flink/hadoop-3.3.5/share/hadoop/yarn/lib/*:/opt/flink/hadoop-3.3.5
/share/hadoop/yarn/*

COPY lib/flink-json-1.16.1.jar /opt/flink/lib/
COPY lib/flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar /opt/flink/lib/
COPY lib/flink-sql-connector-kafka-1.16.2.jar /opt/flink/lib/
COPY lib/iceberg-flink-runtime-1.16-1.3.0.jar /opt/flink/lib/
COPY lib/iceberg-hive-runtime-1.3.0.jar /opt/flink/lib/
COPY lib/hive-metastore-3.1.3.jar /opt/flink/lib/
COPY lib/hadoop-aws-3.3.5.jar /opt/flink/lib/
COPY lib/aws-java-sdk-bundle-1.12.316.jar /opt/flink/lib/

COPY lib/flink-s3-fs-hadoop-1.16.1.jar /opt/flink/plugins/

WORKDIR /opt/flink

I start the sql-client via */opt/flink/bin/sql-client.sh embedded*

I am able to create a sink table with the iceberg connector using the
following query:

create table if not exists clicks_ib
(
`timestamp` STRING,
event STRING,
user_id STRING,
site_id STRING,
url STRING,
on_site_seconds INT,
viewed_percent INT
)
with ( 'connector'='iceberg',
'catalog-name'='hive_catalog',
'uri'='thrift://hivemetastore:9083',
'warehouse'='s3a://iceberg');

But when I try to select from it, I run into the following error:

*Flink SQL> select * from default_catalog.default_database.clicks_ib;*


*[ERROR] Could not execute SQL statement.
Reason:org.apache.hadoop.hive.metastore.api.MetaException:
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3a.S3AFileSystem not found*

I feel like there is still a little confusion in me on where to place what
jars, but not exactly sure what is missing.

For reference, I'll paste the current full docker-compose.yml below.

version: "3.7"
services:

sqlclient:
container_name: sqlclient
build: .
command:
- /opt/flink/bin/sql-client.sh
- embedded
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
volumes:
- ./flink-sql:/etc/sql

jobmanager:
build: .
hostname: "jobmanager"
container_name: "jobmanager"
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1

taskmanager:
image: flink:1.16.2-scala_2.12-java11
hostname: "taskmanager"
container_name: "taskmanager"
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1

mariadb:
image: 'mariadb:latest'
hostname: mariadb
container_name: mariadb
ports:
- '3306:3306'
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: admin
MYSQL_PASSWORD: admin
MYSQL_DATABASE: metastore_db
volumes:
- ./mariadb-data:/var/lib/mysql

hivemetastore:
hostname: hivemetastore
container_name: hivemetastore
build:
context: hive
ports:
- '9083:9083'
environment:
METASTORE_DB_HOSTNAME: mariadb
depends_on:
- mariadb

minio:
hostname: "minio"
image: "minio/minio:latest"
container_name: "minio"
ports:
- "9001:9001"
- "9000:9000"
command:
- "server"
- "/data"
- "--console-address"
- ":9001"
volumes:
- "minio:/data"
environment:
MINIO_ROOT_USER: "minio"
MINIO_ROOT_PASSWORD: "minio123"
networks:
default:
aliases:
- iceberg.minio

mc:
depends_on:
- "minio"
image: "minio/mc"
container_name: "mc"
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 minio minio123)
do echo "...waiting..." && sleep 1; done;
/usr/bin/mc rm -r --force minio/iceberg;
/usr/bin/mc mb minio/iceberg;
/usr/bin/mc policy set public minio/iceberg;
tail -f /dev/null
"

broker:
image: confluentinc/cp-kafka:7.4.0
hostname: broker
container_name: broker
depends_on:
- controller
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS:
'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker'
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh
random-uuid"
# See
https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

controller:
image: confluentinc/cp-kafka:7.4.0
hostname: controller
container_name: controller
ports:
- "9093:9093"
- "9102:9102"
environment:
KAFKA_NODE_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9102
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh
random-uuid"
# See
https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

# schema-registry:
# image: confluentinc/cp-schema-registry:7.4.0
# hostname: schema-registry
# container_name: schema-registry
# depends_on:
# - broker
# - controller
# ports:
# - "8081:8081"
# environment:
# SCHEMA_REGISTRY_HOST_NAME: schema-registry
# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

control-center:
image: confluentinc/cp-enterprise-control-center:7.4.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- controller
# - schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
# CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081";
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021

volumes:
minio: null

networks:
default:
name: flinkberg

Best,

Dani

On Thu, Jun 29, 2023 at 9:06 AM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi Dani,
>
> There are two things that I notice:
>
> 1. You're mixing different Flink versions (1.16 and 1.17): all Flink
> artifacts should be from the same Flink version
> 2. S3 plugins need to be added to the plugins folder of Flink, because
> they are loaded via the plugin mechanism. See
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/
>
> Best regards,
>
> Martijn
>
> On Sat, Jun 24, 2023 at 1:22 PM Dániel Pálma <dani...@gmail.com> wrote:
>
>> Hey folks,
>>
>> Nice to meet ya'll!
>>
>> I'm trying to get the following stack up and running locally:
>>
>> - Kafka as source
>> - pyFlink SQL
>> - Iceberg on top of MinIO
>>
>> The goal is to have a pyflink script that reads data from a Kafka topic,
>> does some transformations, and dumps it into an iceberg table.
>>
>> I have everything, except for the pyflink app running in Docker, defined
>> in a docker-compose.yml:
>>
>> version: "3.7"
>> services:
>>
>> mariadb:
>> image: 'mariadb:latest'
>> hostname: mariadb
>> container_name: mariadb
>> ports:
>> - '3306:3306'
>> environment:
>> MYSQL_ROOT_PASSWORD: admin
>> MYSQL_USER: admin
>> MYSQL_PASSWORD: admin
>> MYSQL_DATABASE: metastore_db
>> volumes:
>> - ./mariadb-data:/var/lib/mysql
>> networks:
>> iceberg_net:
>>
>> hive-metastore:
>> hostname: hive-metastore
>> container_name: hive-metastore
>> build:
>> context: hive
>> ports:
>> - '9083:9083'
>> environment:
>> METASTORE_DB_HOSTNAME: mariadb
>> depends_on:
>> - mariadb
>> networks:
>> iceberg_net:
>>
>> minio:
>> hostname: "minio"
>> image: "minio/minio:latest"
>> container_name: "minio"
>> ports:
>> - "9001:9001"
>> - "9000:9000"
>> command:
>> - "server"
>> - "/data"
>> - "--console-address"
>> - ":9001"
>> volumes:
>> - "minio:/data"
>> environment:
>> MINIO_ROOT_USER: "minio"
>> MINIO_ROOT_PASSWORD: "minio123"
>> networks:
>> iceberg_net:
>> aliases:
>> - iceberg.minio
>>
>> mc:
>> depends_on:
>> - "minio"
>> image: "minio/mc"
>> container_name: "mc"
>> entrypoint: >
>> /bin/sh -c "
>> until (/usr/bin/mc config host add minio http://minio:9000 minio
>> minio123) do echo "...waiting..." && sleep 1; done;
>> /usr/bin/mc rm -r --force minio/iceberg;
>> /usr/bin/mc mb minio/iceberg;
>> /usr/bin/mc policy set public minio/iceberg;
>> tail -f /dev/null
>> "
>> networks:
>> iceberg_net:
>>
>> broker:
>> image: confluentinc/cp-kafka:7.4.0
>> hostname: broker
>> container_name: broker
>> depends_on:
>> - controller
>> ports:
>> - "9092:9092"
>> - "9101:9101"
>> environment:
>> KAFKA_NODE_ID: 1
>> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
>> 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
>> KAFKA_ADVERTISED_LISTENERS:
>> 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
>> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
>> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
>> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
>> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
>> KAFKA_JMX_PORT: 9101
>> KAFKA_JMX_HOSTNAME: localhost
>> KAFKA_PROCESS_ROLES: 'broker'
>> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
>> KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092'
>> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
>> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
>> KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
>> # Replace CLUSTER_ID with a unique base64 UUID using
>> "bin/kafka-storage.sh random-uuid"
>> # See
>> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
>> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
>> networks:
>> iceberg_net:
>>
>> controller:
>> image: confluentinc/cp-kafka:7.4.0
>> hostname: controller
>> container_name: controller
>> ports:
>> - "9093:9093"
>> - "9102:9102"
>> environment:
>> KAFKA_NODE_ID: 2
>> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
>> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
>> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
>> KAFKA_JMX_PORT: 9102
>> KAFKA_JMX_HOSTNAME: localhost
>> KAFKA_PROCESS_ROLES: 'controller'
>> KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
>> KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
>> KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
>> KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
>> KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
>> # Replace CLUSTER_ID with a unique base64 UUID using
>> "bin/kafka-storage.sh random-uuid"
>> # See
>> https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
>> CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
>> networks:
>> iceberg_net:
>>
>> schema-registry:
>> image: confluentinc/cp-schema-registry:7.4.0
>> hostname: schema-registry
>> container_name: schema-registry
>> depends_on:
>> - broker
>> - controller
>> ports:
>> - "8081:8081"
>> environment:
>> SCHEMA_REGISTRY_HOST_NAME: schema-registry
>> SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
>> SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
>> networks:
>> iceberg_net:
>>
>> control-center:
>> image: confluentinc/cp-enterprise-control-center:7.4.0
>> hostname: control-center
>> container_name: control-center
>> depends_on:
>> - broker
>> - controller
>> - schema-registry
>> ports:
>> - "9021:9021"
>> environment:
>> CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
>> CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081";
>> CONTROL_CENTER_REPLICATION_FACTOR: 1
>> CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
>> CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
>> CONFLUENT_METRICS_TOPIC_REPLICATION: 1
>> PORT: 9021
>> networks:
>> iceberg_net:
>>
>> networks:
>> iceberg_net:
>>
>> volumes:
>> minio: null
>>
>>
>> My PyFlink program looks like this currently:
>>
>> def demo():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env.enable_checkpointing(1000)
>>
>> for jar in FLINK_JARS:
>> print(f"Adding jar: {jar}")
>> env.add_jars(f"file://{jar}")
>>
>> t_env = StreamTableEnvironment.create(stream_execution_environment=env)
>> t_env.execute_sql("""
>> CREATE CATALOG hive_catalog WITH (
>> 'type'='iceberg',
>> 'catalog-type'='hive',
>> 'uri'='thrift://localhost:9083',
>> 'warehouse'='s3a://iceberg'
>> )
>> """)
>>
>> t_env.use_catalog("hive_catalog")
>>
>> t_env.execute_sql("""
>> create database if not exists iceberg
>> """)
>>
>>
>> These jars are added to the streaming environment:
>>
>> - aws-java-sdk-bundle-1.12.316.jar
>> - flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar hive-metastore-3.1.3.jar
>> - flink-s3-fs-hadoop-1.16.1.jar
>> - flink-sql-connector-kafka-1.16.2.jar
>> - iceberg-flink-runtime-1.16-1.3.0.jar
>> - flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar hadoop-aws-3.3.5.jar
>> - iceberg-hive-runtime-1.3.0.jar
>>
>> The metastore operates with the following versions:
>>
>> HADOOP_VERSION=3.3.5
>> METASTORE_VERSION=3.1.3
>>
>> And finally, this is the error I am currently running into:
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *pyflink.util.exceptions.TableException:
>> org.apache.flink.table.api.TableException: Could not execute CREATE
>> DATABASE: (catalogDatabase: [{}], catalogName: [hive_catalog],
>> databaseName: [iceberg], ignoreIfExists: [true])        at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1125)
>>       at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
>>       at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)        at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>       at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>       at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>       at
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>>     at
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>       at
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>>       at
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>>       at java.base/java.lang.Thread.run(Thread.java:829)Caused by:
>> java.lang.RuntimeException: Failed to create namespace iceberg in Hive
>> Metastore        at
>> org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:294)
>>       at
>> org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:230)
>>       at
>> org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:221)
>>       at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1117)
>>       ... 12 moreCaused by:
>> MetaException(message:java.lang.RuntimeException:
>> java.lang.ClassNotFoundException: Class
>> org.apache.hadoop.fs.s3a.S3AFileSystem not found)        at
>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343)
>>       at
>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311)
>>       at
>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245)
>>       at
>> org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
>> at
>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106)
>>       at
>> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093)
>>       at
>> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811)
>>       at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)        at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>       at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)
>>       at com.sun.proxy.$Proxy36.createDatabase(Unknown Source)        at
>> org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283)
>>       at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)
>>   at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
>> at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
>>       at
>> org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:281)
>>       ... 15 more*
>>
>> I also have the HADOOP classpath added to the environment before running
>> the script via: *export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop
>> classpath`*
>> I'm not well versed in Java but as far as I understand including the
>> `hadoop-aws` jar in the flink runtime should provide the necessary classes
>> to run this example, right?
>>
>> I was looking for any pointers on where to go from here, or maybe
>> existing examples of similar setups (I couldn't find any).
>>
>> Best,
>>
>> Dani
>>
>

Reply via email to