Hi Biao, 1. I have a Flink client container like this: # Flink client flink_client: container_name: flink_client image: flink-client:local build: context: . dockerfile: flink_client/Dockerfile networks: - standard depends_on: - jobmanager - Kafka
The flink_client/Dockerfile has this bash file which only runs the Flink client, when the Flink cluster is up an running: Dockerfile: ENTRYPOINT [“./run_when_ready.sh”] run_when_ready.sh: #!/bin/bash wait_for_jobmanager() { echo "Waiting for Flink JobManager to be ready..." while ! nc -z jobmanager 8081; do sleep 1 done echo "Flink JobManager is ready." } # Call the function wait_for_jobmanager # Now run the PyFlink job python -m flink_client.job flink_client.job.py is the file I attached earlier which has: >>>> if __name__ == "__main__": >>>> process_table() So it runs that file which should submit the Table job to the Flink cluster as that file runs: >>>> t_env.execute_sql(""" >>>> SELECT COUNT(*) AS message_count >>>> FROM kafka_test_logs >>>> """).print() I don’t use flink run -t … Is this the wrong way to run PyFlink jobs? Also, do I need to also add the flink-sql-connector.jar to the flink_client/Dockerfile, i.e. COPY /libs/*.jar /opt/flink/lib/ and instead of relying on the environment.execute_sql(…), I need to run it with flink_run -t and the JAR file as you mentioned earlier? > On 10 Apr 2024, at 11:37, Biao Geng <biaoge...@gmail.com> wrote: > > Hi Phil, > > Your codes look good. I mean how do you run the python script. Maybe you are > using flink cli? i.e. run commands like ` flink run -t .. -py job.py -j > /path/to/flink-sql-kafka-connector.jar`. If that's the case, the `-j > /path/to/flink-sql-kafka-connector.jar` is necessary so that in client side, > flink can generate the job graph successfully. > > As for the second question, in your case, yes, your client’s env need to have > the flink-sql-kafka-connector JAR. You can check the doc > <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#deployment-modes> > for more details. In short, your yaml shows that you are using the session > mode, which needs connector jar to generate job graph in the client side. > > > Best, > Biao Geng > > Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月10日周三 > 18:14写道: >> Hi Biao, >> >> For submitting the job, I run t_env.execute_sql. >> Shouldn’t that be sufficient for submitting the job using the Table API with >> PyFlink? Isn’t that the recommended way for submitting and running PyFlink >> jobs on a running Flink cluster? The Flink cluster runs without issues, but >> there is no Running Job when the container flink_app, that has the client >> code, runs. >> I could submit a job, collect, using the Flink SQL console, and it was >> available on the Running Jobs tab, but that was also not ingesting data from >> the Kafka topic. >> Does my client’s env need to have the flink-sql-kafka-connector JAR? Is that >> needed just in the class path of the Flink JobManager and TaskManager? >> >> Kind regards >> Phil >> >> >>> On 10 Apr 2024, at 03:33, Biao Geng <biaoge...@gmail.com >>> <mailto:biaoge...@gmail.com>> wrote: >>> >>> Hi Phil, >>> >>> Thanks for sharing the detailed information of the job. For you question, >>> how to you submit the job? After applying your yaml file, I think you will >>> successfully launch a flink cluster with 1 JM and 1 TM. Then you would >>> submit the pyflink job to the flink cluster. As the error you showed is the >>> client-side error, it is possible that your client's env does not contain >>> the flink-sql-kafka-connector jar which may lead to the exception. >>> By the way, the "Table API" codes in your mail is actually using the flink >>> SQL API, so the flink-sql-kafka-connector jar is required, which is exactly >>> what you have prepared. For pyflink's table API, you can have a look at >>> this document: >>> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html >>> >>> Best, >>> Biao Geng >>> >>> >>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月10日周三 >>> 03:10写道: >>>> Hello, >>>> >>>> I have set up Flink and Kafka containers using docker-compose, for testing >>>> how Flink works for processing Kafka messages. >>>> I primarily want to check how the Table API works but also how the Stream >>>> API would process the Kafka messages. >>>> >>>> I have included the main part of the docker-compose.yaml file I am using >>>> for the Flink cluster. I have commented out some of the JAR files I have >>>> tried out but I have mainly used the flink-sql-connector-kafka JAR. I >>>> would like to confirm if I am using any wrong configurations, which JARs >>>> should I be using for each API and if just need to use one JAR for both >>>> Table and Datastream APIs? >>>> I have also included the Flink client module I have used for both the >>>> Table and the Datastream APIs and the error messages. >>>> Any idea what is missing or if there is any configuration that seems wrong? >>>> >>>> docker-compose.yml >>>> flink_jobmanager: >>>> image: flink:1.18.1-scala_2.12 >>>> container_name: flink_jobmanager >>>> ports: >>>> - "8081:8081" >>>> command: jobmanager >>>> volumes: >>>> - >>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar >>>> # - >>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar >>>> # - >>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar >>>> # - >>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar >>>> environment: >>>> - | >>>> FLINK_PROPERTIES= >>>> jobmanager.rpc.address: jobmanager >>>> networks: >>>> - standard >>>> depends_on: >>>> - kafka >>>> >>>> flink_taskmanager: >>>> image: flink:1.18.1-scala_2.12 >>>> container_name: flink_taskmanager >>>> volumes: >>>> - >>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar >>>> # - >>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar >>>> # - >>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar >>>> # - >>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar >>>> depends_on: >>>> - kafka >>>> - jobmanager >>>> command: taskmanager >>>> environment: >>>> - | >>>> FLINK_PROPERTIES= >>>> jobmanager.rpc.address: jobmanager >>>> taskmanager.numberOfTaskSlots: 2 >>>> networks: >>>> - standard >>>> Table API: >>>> def process_table(): >>>> >>>> print('Running the Table job now.’) >>>> env_settings = ( >>>> EnvironmentSettings >>>> .new_instance() >>>> .in_streaming_mode() >>>> # .with_configuration(config) >>>> .build() >>>> ) >>>> t_env = TableEnvironment.create(env_settings) >>>> >>>> t_env.execute_sql( >>>> f""" >>>> CREATE TABLE kafka_test_logs ( >>>> action2 STRING >>>> ) WITH ( >>>> 'connector' = 'kafka', >>>> 'topic' = 'test_logs', >>>> 'properties.bootstrap.servers' = 'kafka:9092', >>>> 'properties.group.id <http://properties.group.id/>' = 'flink_group', >>>> 'scan.startup.mode' = 'earliest-offset', >>>> 'format' = 'json', >>>> 'json.ignore-parse-errors' = 'true' >>>> ) >>>> """) >>>> >>>> >>>> t_env.execute_sql(""" >>>> SELECT COUNT(*) AS message_count >>>> FROM kafka_test_logs >>>> """).print() >>>> >>>> print('Table job has now completed.') >>>> >>>> >>>> if __name__ == "__main__": >>>> process_table() >>>> error: >>>> link_app | Caused by: org.apache.flink.table.api.ValidationException: >>>> Could not find any factory for identifier 'kafka' that implements >>>> 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. >>>> flink_app | >>>> flink_app | Available factory identifiers are: >>>> flink_app | >>>> flink_app | blackhole >>>> flink_app | datagen >>>> flink_app | filesystem >>>> flink_app | print >>>> flink_app | python-input-format >>>> The '/opt/flink/lib' has the JARs from both the image and the JARs i added >>>> in the docker-compose.yml file. >>>> >>>> Stream API: >>>> def process_stream(): >>>> env = StreamExecutionEnvironment.get_execution_environment() >>>> >>>> properties = { >>>> "bootstrap.servers": 'kafka:9092', >>>> "group.id <http://group.id/>": "test_group" >>>> } >>>> >>>> kafka_consumer = FlinkKafkaConsumer( >>>> topics='test_logs', >>>> deserialization_schema=SimpleStringSchema(), >>>> properties=properties) >>>> >>>> data_stream = env.add_source(kafka_consumer) >>>> >>>> parsed_stream = data_stream.map(lambda x: json.loads(x), >>>> output_type=Types.ROW([Types.STRING()])) >>>> parsed_stream.print() >>>> >>>> env.execute("Kafka DataStream Test”) >>>> >>>> if __name__ == "__main__": >>>> process_stream() >>>> error: >>>> TypeError: Could not found the Java class >>>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java >>>> dependencies could be specified via command line argument '--jarfile' or >>>> the config option 'pipeline.jars' >>>> >>>> I have used the JAR dependencies based on: >>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis >>>> >>>> Kind regards >>>> Phil >>