Hello,

I have set up Flink and Kafka containers using docker-compose, for testing how 
Flink works for processing Kafka messages.
I primarily want to check how the Table API works but also how the Stream API 
would process the Kafka messages.

I have included the main part of the docker-compose.yaml file I am using for 
the Flink cluster. I have commented out some of the JAR files I have tried out 
but I have mainly used the flink-sql-connector-kafka JAR. I would like to 
confirm if I am using any wrong configurations, which JARs should I be using 
for each API and if just need to use one JAR for both Table and Datastream APIs?
I have also included the Flink client module I have used for both the Table and 
the Datastream APIs and the error messages.
Any idea what is missing or if there is any configuration that seems wrong?

docker-compose.yml
flink_jobmanager:
image: flink:1.18.1-scala_2.12
container_name: flink_jobmanager
ports:
- "8081:8081"
command: jobmanager
volumes:
- 
./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
# - 
./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
# - 
./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
# - 
./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
networks:
- standard
depends_on:
- kafka

flink_taskmanager:
image: flink:1.18.1-scala_2.12
container_name: flink_taskmanager
volumes:
- 
./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
# - 
./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
# - 
./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
# - 
./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
depends_on:
- kafka
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
networks:
- standard
Table API:
def process_table():

print('Running the Table job now.’)     
env_settings = (
EnvironmentSettings
.new_instance()
.in_streaming_mode()
# .with_configuration(config)
.build()
)
t_env = TableEnvironment.create(env_settings)

t_env.execute_sql(
f"""
CREATE TABLE kafka_test_logs (
action2 STRING
) WITH (
'connector' = 'kafka',
'topic' = 'test_logs',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
)
""")


t_env.execute_sql("""
SELECT COUNT(*) AS message_count
FROM kafka_test_logs
""").print()

print('Table job has now completed.')


if __name__ == "__main__":
        process_table()
error:
link_app | Caused by: org.apache.flink.table.api.ValidationException: Could not 
find any factory for identifier 'kafka' that implements 
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
flink_app |
flink_app | Available factory identifiers are:
flink_app |
flink_app | blackhole
flink_app | datagen
flink_app | filesystem
flink_app | print
flink_app | python-input-format
The '/opt/flink/lib' has the JARs from both the image and the JARs i added in 
the docker-compose.yml file.

Stream API:
def process_stream():
env = StreamExecutionEnvironment.get_execution_environment()

properties = {
"bootstrap.servers": 'kafka:9092',
"group.id": "test_group"
}

kafka_consumer = FlinkKafkaConsumer(
topics='test_logs',
deserialization_schema=SimpleStringSchema(),
properties=properties)

data_stream = env.add_source(kafka_consumer)

parsed_stream = data_stream.map(lambda x: json.loads(x), 
output_type=Types.ROW([Types.STRING()]))
parsed_stream.print()

env.execute("Kafka DataStream Test”)

if __name__ == "__main__":
        process_stream()
error:
TypeError: Could not found the Java class 
'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java 
dependencies could be specified via command line argument '--jarfile' or the 
config option 'pipeline.jars'

I have used the JAR dependencies based on:
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis

Kind regards
Phil

Reply via email to