Hey folks,

Nice to meet ya'll!

I'm trying to get the following stack up and running locally:

- Kafka as source
- pyFlink SQL
- Iceberg on top of MinIO

The goal is to have a pyflink script that reads data from a Kafka topic,
does some transformations, and dumps it into an iceberg table.

I have everything, except for the pyflink app running in Docker, defined in
a docker-compose.yml:

version: "3.7"
services:

mariadb:
image: 'mariadb:latest'
hostname: mariadb
container_name: mariadb
ports:
- '3306:3306'
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: admin
MYSQL_PASSWORD: admin
MYSQL_DATABASE: metastore_db
volumes:
- ./mariadb-data:/var/lib/mysql
networks:
iceberg_net:

hive-metastore:
hostname: hive-metastore
container_name: hive-metastore
build:
context: hive
ports:
- '9083:9083'
environment:
METASTORE_DB_HOSTNAME: mariadb
depends_on:
- mariadb
networks:
iceberg_net:

minio:
hostname: "minio"
image: "minio/minio:latest"
container_name: "minio"
ports:
- "9001:9001"
- "9000:9000"
command:
- "server"
- "/data"
- "--console-address"
- ":9001"
volumes:
- "minio:/data"
environment:
MINIO_ROOT_USER: "minio"
MINIO_ROOT_PASSWORD: "minio123"
networks:
iceberg_net:
aliases:
- iceberg.minio

mc:
depends_on:
- "minio"
image: "minio/mc"
container_name: "mc"
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 minio minio123)
do echo "...waiting..." && sleep 1; done;
/usr/bin/mc rm -r --force minio/iceberg;
/usr/bin/mc mb minio/iceberg;
/usr/bin/mc policy set public minio/iceberg;
tail -f /dev/null
"
networks:
iceberg_net:

broker:
image: confluentinc/cp-kafka:7.4.0
hostname: broker
container_name: broker
depends_on:
- controller
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS:
'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker'
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh
random-uuid"
# See
https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
networks:
iceberg_net:

controller:
image: confluentinc/cp-kafka:7.4.0
hostname: controller
container_name: controller
ports:
- "9093:9093"
- "9102:9102"
environment:
KAFKA_NODE_ID: 2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9102
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@controller:9093'
KAFKA_LISTENERS: 'CONTROLLER://controller:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-controller-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh
random-uuid"
# See
https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
networks:
iceberg_net:

schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
- controller
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
iceberg_net:

control-center:
image: confluentinc/cp-enterprise-control-center:7.4.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- controller
- schema-registry
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081";
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
networks:
iceberg_net:

networks:
iceberg_net:

volumes:
minio: null


My PyFlink program looks like this currently:

def demo():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.enable_checkpointing(1000)

for jar in FLINK_JARS:
print(f"Adding jar: {jar}")
env.add_jars(f"file://{jar}")

t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'warehouse'='s3a://iceberg'
)
""")

t_env.use_catalog("hive_catalog")

t_env.execute_sql("""
create database if not exists iceberg
""")


These jars are added to the streaming environment:

- aws-java-sdk-bundle-1.12.316.jar
- flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar hive-metastore-3.1.3.jar
- flink-s3-fs-hadoop-1.16.1.jar
- flink-sql-connector-kafka-1.16.2.jar
- iceberg-flink-runtime-1.16-1.3.0.jar
- flink-sql-connector-hive-3.1.2_2.12-1.16.2.jar hadoop-aws-3.3.5.jar
- iceberg-hive-runtime-1.3.0.jar

The metastore operates with the following versions:

HADOOP_VERSION=3.3.5
METASTORE_VERSION=3.1.3

And finally, this is the error I am currently running into:








































*pyflink.util.exceptions.TableException:
org.apache.flink.table.api.TableException: Could not execute CREATE
DATABASE: (catalogDatabase: [{}], catalogName: [hive_catalog],
databaseName: [iceberg], ignoreIfExists: [true])        at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1125)
      at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
      at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
      at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
      at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
      at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
      at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
      at java.base/java.lang.Thread.run(Thread.java:829)Caused by:
java.lang.RuntimeException: Failed to create namespace iceberg in Hive
Metastore        at
org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:294)
      at
org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:230)
      at
org.apache.iceberg.flink.FlinkCatalog.createDatabase(FlinkCatalog.java:221)
      at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1117)
      ... 12 moreCaused by:
MetaException(message:java.lang.RuntimeException:
java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.s3a.S3AFileSystem not found)        at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39343)
      at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result$create_database_resultStandardScheme.read(ThriftHiveMetastore.java:39311)
      at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$create_database_result.read(ThriftHiveMetastore.java:39245)
      at
org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_create_database(ThriftHiveMetastore.java:1106)
      at
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.create_database(ThriftHiveMetastore.java:1093)
      at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:811)
      at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)        at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)
      at com.sun.proxy.$Proxy36.createDatabase(Unknown Source)        at
org.apache.iceberg.hive.HiveCatalog.lambda$createNamespace$8(HiveCatalog.java:283)
      at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:58)
  at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
      at
org.apache.iceberg.hive.HiveCatalog.createNamespace(HiveCatalog.java:281)
      ... 15 more*

I also have the HADOOP classpath added to the environment before running
the script via: *export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop
classpath`*
I'm not well versed in Java but as far as I understand including the
`hadoop-aws` jar in the flink runtime should provide the necessary classes
to run this example, right?

I was looking for any pointers on where to go from here, or maybe existing
examples of similar setups (I couldn't find any).

Best,

Dani

Reply via email to