Hello, I have set up Flink and Kafka containers using docker-compose, for testing how Flink works for processing Kafka messages. I primarily want to check how the Table API works but also how the Stream API would process the Kafka messages.
I have included the main part of the docker-compose.yaml file I am using for the Flink cluster. I have commented out some of the JAR files I have tried out but I have mainly used the flink-sql-connector-kafka JAR. I would like to confirm if I am using any wrong configurations, which JARs should I be using for each API and if just need to use one JAR for both Table and Datastream APIs? I have also included the Flink client module I have used for both the Table and the Datastream APIs and the error messages. Any idea what is missing or if there is any configuration that seems wrong? docker-compose.yml flink_jobmanager: image: flink:1.18.1-scala_2.12 container_name: flink_jobmanager ports: - "8081:8081" command: jobmanager volumes: - ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar # - ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar # - ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar # - ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager networks: - standard depends_on: - kafka flink_taskmanager: image: flink:1.18.1-scala_2.12 container_name: flink_taskmanager volumes: - ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar # - ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar # - ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar # - ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar depends_on: - kafka - jobmanager command: taskmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 networks: - standard Table API: def process_table(): print('Running the Table job now.’) env_settings = ( EnvironmentSettings .new_instance() .in_streaming_mode() # .with_configuration(config) .build() ) t_env = TableEnvironment.create(env_settings) t_env.execute_sql( f""" CREATE TABLE kafka_test_logs ( action2 STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'test_logs', 'properties.bootstrap.servers' = 'kafka:9092', 'properties.group.id' = 'flink_group', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'true' ) """) t_env.execute_sql(""" SELECT COUNT(*) AS message_count FROM kafka_test_logs """).print() print('Table job has now completed.') if __name__ == "__main__": process_table() error: link_app | Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. flink_app | flink_app | Available factory identifiers are: flink_app | flink_app | blackhole flink_app | datagen flink_app | filesystem flink_app | print flink_app | python-input-format The '/opt/flink/lib' has the JARs from both the image and the JARs i added in the docker-compose.yml file. Stream API: def process_stream(): env = StreamExecutionEnvironment.get_execution_environment() properties = { "bootstrap.servers": 'kafka:9092', "group.id": "test_group" } kafka_consumer = FlinkKafkaConsumer( topics='test_logs', deserialization_schema=SimpleStringSchema(), properties=properties) data_stream = env.add_source(kafka_consumer) parsed_stream = data_stream.map(lambda x: json.loads(x), output_type=Types.ROW([Types.STRING()])) parsed_stream.print() env.execute("Kafka DataStream Test”) if __name__ == "__main__": process_stream() error: TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars' I have used the JAR dependencies based on: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis Kind regards Phil