Hi Phil, Thanks for sharing the detailed information of the job. For you question, how to you submit the job? After applying your yaml file, I think you will successfully launch a flink cluster with 1 JM and 1 TM. Then you would submit the pyflink job to the flink cluster. As the error you showed is the client-side error, it is possible that your client's env does not contain the flink-sql-kafka-connector jar which may lead to the exception. By the way, the "Table API" codes in your mail is actually using the flink SQL API, so the flink-sql-kafka-connector jar is required, which is exactly what you have prepared. For pyflink's table API, you can have a look at this document: https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html
Best, Biao Geng Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 03:10写道: > Hello, > > I have set up Flink and Kafka containers using docker-compose, for testing > how Flink works for processing Kafka messages. > I primarily want to check how the Table API works but also how the Stream > API would process the Kafka messages. > > I have included the main part of the docker-compose.yaml file I am using > for the Flink cluster. I have commented out some of the JAR files I have > tried out but I have mainly used the flink-sql-connector-kafka JAR. I would > like to confirm if I am using any wrong configurations, which JARs should I > be using for each API and if just need to use one JAR for both Table and > Datastream APIs? > I have also included the Flink client module I have used for both the > Table and the Datastream APIs and the error messages. > Any idea what is missing or if there is any configuration that seems wrong? > > docker-compose.yml > flink_jobmanager: > image: flink:1.18.1-scala_2.12 > container_name: flink_jobmanager > ports: > - "8081:8081" > command: jobmanager > volumes: > - > ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar > # - > ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar > # - > ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar > # - > ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > networks: > - standard > depends_on: > - kafka > > flink_taskmanager: > image: flink:1.18.1-scala_2.12 > container_name: flink_taskmanager > volumes: > - > ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar > # - > ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar > # - > ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar > # - > ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar > depends_on: > - kafka > - jobmanager > command: taskmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > networks: > - standard > Table API: > def process_table(): > > print('Running the Table job now.’) > env_settings = ( > EnvironmentSettings > .new_instance() > .in_streaming_mode() > # .with_configuration(config) > .build() > ) > t_env = TableEnvironment.create(env_settings) > > t_env.execute_sql( > f""" > CREATE TABLE kafka_test_logs ( > action2 STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'test_logs', > 'properties.bootstrap.servers' = 'kafka:9092', > 'properties.group.id' = 'flink_group', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json', > 'json.ignore-parse-errors' = 'true' > ) > """) > > > t_env.execute_sql(""" > SELECT COUNT(*) AS message_count > FROM kafka_test_logs > """).print() > > print('Table job has now completed.') > > > if __name__ == "__main__": > process_table() > error: > link_app | Caused by: org.apache.flink.table.api.ValidationException: > Could not find any factory for identifier 'kafka' that implements > 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. > flink_app | > flink_app | Available factory identifiers are: > flink_app | > flink_app | blackhole > flink_app | datagen > flink_app | filesystem > flink_app | print > flink_app | python-input-format > The '/opt/flink/lib' has the JARs from both the image and the JARs i added > in the docker-compose.yml file. > > Stream API: > def process_stream(): > env = StreamExecutionEnvironment.get_execution_environment() > > properties = { > "bootstrap.servers": 'kafka:9092', > "group.id": "test_group" > } > > kafka_consumer = FlinkKafkaConsumer( > topics='test_logs', > deserialization_schema=SimpleStringSchema(), > properties=properties) > > data_stream = env.add_source(kafka_consumer) > > parsed_stream = data_stream.map(lambda x: json.loads(x), > output_type=Types.ROW([Types.STRING()])) > parsed_stream.print() > > env.execute("Kafka DataStream Test”) > > if __name__ == "__main__": > process_stream() > error: > TypeError: Could not found the Java class > 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java > dependencies could be specified via command line argument '--jarfile' or > the config option 'pipeline.jars' > > I have used the JAR dependencies based on: > > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis > > Kind regards > Phil >