Hi Phil, It should be totally ok to use `python -m flink_client.job`. It just seems to me that the flink cli is being used more often. And yes, you also need to add the sql connector jar to the flink_client container. After putting the jar in your client container, add codes like `table_env.get_config().set("pipeline.jars", "file:///path/in/container/to/connector.jar")` to your job.py. Then in the client side, the flink can see the connector jar and the jar would be uploaded to the cluster as well. See the doc of Jar Dependencies Management <https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/dependency_management/> for more details.
Best, Biao Geng Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 22:04写道: > Hi Biao, > > 1. I have a Flink client container like this: > # Flink client > flink_client: > container_name: flink_client > image: flink-client:local > build: > context: . > dockerfile: flink_client/Dockerfile > networks: > - standard > depends_on: > - jobmanager > - Kafka > > The flink_client/Dockerfile has this bash file which only runs the Flink > client, when the Flink cluster is up an running: > Dockerfile: > ENTRYPOINT [“./run_when_ready.sh”] > > run_when_ready.sh: > > #!/bin/bash > > wait_for_jobmanager() { > echo "Waiting for Flink JobManager to be ready..." > while ! nc -z jobmanager 8081; do > sleep 1 > done > echo "Flink JobManager is ready." > } > > # Call the function > wait_for_jobmanager > > # Now run the PyFlink job > python -m flink_client.job > flink_client.job.py is the file I attached earlier which has: > > if __name__ == "__main__": >>> process_table() >>> >> > So it runs that file which should submit the Table job to the Flink > cluster as that file runs: > > t_env.execute_sql(""" >>> SELECT COUNT(*) AS message_count >>> FROM kafka_test_logs >>> """).print() >>> >> > I don’t use flink run -t … > Is this the wrong way to run PyFlink jobs? > > Also, do I need to also add the flink-sql-connector.jar to the > flink_client/Dockerfile, i.e. COPY /libs/*.jar /opt/flink/lib/ and instead > of relying on the environment.execute_sql(…), I need to run it with > flink_run -t and the JAR file as you mentioned earlier? > > > On 10 Apr 2024, at 11:37, Biao Geng <biaoge...@gmail.com> wrote: > > Hi Phil, > > Your codes look good. I mean how do you run the python script. Maybe you > are using flink cli? i.e. run commands like ` flink run -t .. -py job.py -j > /path/to/flink-sql-kafka-connector.jar`. If that's the case, the `-j > /path/to/flink-sql-kafka-connector.jar` is necessary so that in client > side, flink can generate the job graph successfully. > > As for the second question, in your case, yes, your client’s env need to > have the flink-sql-kafka-connector JAR. You can check the doc > <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#deployment-modes> > for > more details. In short, your yaml shows that you are using the session > mode, which needs connector jar to generate job graph in the client side. > > > Best, > Biao Geng > > Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 18:14写道: > >> Hi Biao, >> >> For submitting the job, I run t_env.execute_sql. >> Shouldn’t that be sufficient for submitting the job using the Table API >> with PyFlink? Isn’t that the recommended way for submitting and running >> PyFlink jobs on a running Flink cluster? The Flink cluster runs without >> issues, but there is no Running Job when the container flink_app, that has >> the client code, runs. >> I could submit a job, collect, using the Flink SQL console, and it was >> available on the Running Jobs tab, but that was also not ingesting data >> from the Kafka topic. >> Does my client’s env need to have the flink-sql-kafka-connector JAR? Is >> that needed just in the class path of the Flink JobManager and TaskManager? >> >> Kind regards >> Phil >> >> >> On 10 Apr 2024, at 03:33, Biao Geng <biaoge...@gmail.com> wrote: >> >> Hi Phil, >> >> Thanks for sharing the detailed information of the job. For you question, >> how to you submit the job? After applying your yaml file, I think you will >> successfully launch a flink cluster with 1 JM and 1 TM. Then you would >> submit the pyflink job to the flink cluster. As the error you showed is the >> client-side error, it is possible that your client's env does not contain >> the flink-sql-kafka-connector jar which may lead to the exception. >> By the way, the "Table API" codes in your mail is actually using the >> flink SQL API, so the flink-sql-kafka-connector jar is required, which is >> exactly what you have prepared. For pyflink's table API, you can have a >> look at this document: >> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html >> >> Best, >> Biao Geng >> >> >> Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 03:10写道: >> >>> Hello, >>> >>> I have set up Flink and Kafka containers using docker-compose, for >>> testing how Flink works for processing Kafka messages. >>> I primarily want to check how the Table API works but also how the >>> Stream API would process the Kafka messages. >>> >>> I have included the main part of the docker-compose.yaml file I am using >>> for the Flink cluster. I have commented out some of the JAR files I have >>> tried out but I have mainly used the flink-sql-connector-kafka JAR. I would >>> like to confirm if I am using any wrong configurations, which JARs should I >>> be using for each API and if just need to use one JAR for both Table and >>> Datastream APIs? >>> I have also included the Flink client module I have used for both the >>> Table and the Datastream APIs and the error messages. >>> Any idea what is missing or if there is any configuration that seems >>> wrong? >>> >>> docker-compose.yml >>> flink_jobmanager: >>> image: flink:1.18.1-scala_2.12 >>> container_name: flink_jobmanager >>> ports: >>> - "8081:8081" >>> command: jobmanager >>> volumes: >>> - >>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar >>> # - >>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar >>> # - >>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar >>> # - >>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar >>> environment: >>> - | >>> FLINK_PROPERTIES= >>> jobmanager.rpc.address: jobmanager >>> networks: >>> - standard >>> depends_on: >>> - kafka >>> >>> flink_taskmanager: >>> image: flink:1.18.1-scala_2.12 >>> container_name: flink_taskmanager >>> volumes: >>> - >>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar >>> # - >>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar >>> # - >>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar >>> # - >>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar >>> depends_on: >>> - kafka >>> - jobmanager >>> command: taskmanager >>> environment: >>> - | >>> FLINK_PROPERTIES= >>> jobmanager.rpc.address: jobmanager >>> taskmanager.numberOfTaskSlots: 2 >>> networks: >>> - standard >>> Table API: >>> def process_table(): >>> >>> print('Running the Table job now.’) >>> env_settings = ( >>> EnvironmentSettings >>> .new_instance() >>> .in_streaming_mode() >>> # .with_configuration(config) >>> .build() >>> ) >>> t_env = TableEnvironment.create(env_settings) >>> >>> t_env.execute_sql( >>> f""" >>> CREATE TABLE kafka_test_logs ( >>> action2 STRING >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'test_logs', >>> 'properties.bootstrap.servers' = 'kafka:9092', >>> 'properties.group.id' = 'flink_group', >>> 'scan.startup.mode' = 'earliest-offset', >>> 'format' = 'json', >>> 'json.ignore-parse-errors' = 'true' >>> ) >>> """) >>> >>> >>> t_env.execute_sql(""" >>> SELECT COUNT(*) AS message_count >>> FROM kafka_test_logs >>> """).print() >>> >>> print('Table job has now completed.') >>> >>> >>> if __name__ == "__main__": >>> process_table() >>> error: >>> link_app | Caused by: org.apache.flink.table.api.ValidationException: >>> Could not find any factory for identifier 'kafka' that implements >>> 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. >>> flink_app | >>> flink_app | Available factory identifiers are: >>> flink_app | >>> flink_app | blackhole >>> flink_app | datagen >>> flink_app | filesystem >>> flink_app | print >>> flink_app | python-input-format >>> The '/opt/flink/lib' has the JARs from both the image and the JARs i >>> added in the docker-compose.yml file. >>> >>> Stream API: >>> def process_stream(): >>> env = StreamExecutionEnvironment.get_execution_environment() >>> >>> properties = { >>> "bootstrap.servers": 'kafka:9092', >>> "group.id": "test_group" >>> } >>> >>> kafka_consumer = FlinkKafkaConsumer( >>> topics='test_logs', >>> deserialization_schema=SimpleStringSchema(), >>> properties=properties) >>> >>> data_stream = env.add_source(kafka_consumer) >>> >>> parsed_stream = data_stream.map(lambda x: json.loads(x), >>> output_type=Types.ROW([Types.STRING()])) >>> parsed_stream.print() >>> >>> env.execute("Kafka DataStream Test”) >>> >>> if __name__ == "__main__": >>> process_stream() >>> error: >>> TypeError: Could not found the Java class >>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java >>> dependencies could be specified via command line argument '--jarfile' or >>> the config option 'pipeline.jars' >>> >>> I have used the JAR dependencies based on: >>> >>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis >>> >>> Kind regards >>> Phil >>> >> >> >