Thanks Biao. Kind regards Phil
> On 14 Apr 2024, at 18:04, Biao Geng <biaoge...@gmail.com> wrote: > > Hi Phil, > > You can check my github link > <https://github.com/bgeng777/pyflink-learning/tree/main/pyflink-using-docker> > for a detailed tutorial and example codes :). > > Best, > Biao Geng > > Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月12日周五 > 19:10写道: >> Hi Biao, >> >> Thanks for looking into it and providing a detailed example. >> I am not sure I am following some of the setup and I just want to make sure >> I run what you have tested without messing up anything with the setup. >> Especially for how you create the Docker images etc. Do you have this >> committed somewhere or could you perhaps provide a JAR file with the >> implementation? Thanks. >> >>> On 11 Apr 2024, at 19:43, Biao Geng <biaoge...@gmail.com >>> <mailto:biaoge...@gmail.com>> wrote: >>> >>> Hi Phil, >>> I test it on my local docker environment and find that we do need to use >>> "flink run" to submit the job to the session cluster. Simply using `python >>> xx.py` may just launch a local mini cluster and would not submit the job to >>> the cluster you created. Also note, that all required dependencies (e.g. >>> the kafka connector) need to be available in the cluster as well as the >>> client. >>> >>> Here is my test codes which work in my env, hope this helps: >>> my flink-cluster-and-client.yaml: >>> version: "2.2" >>> services: >>> jobmanager: >>> image: pyflink:latest >>> ports: >>> - "8081:8081" >>> command: jobmanager >>> environment: >>> - | >>> FLINK_PROPERTIES= >>> jobmanager.rpc.address: jobmanager >>> >>> taskmanager: >>> image: pyflink:latest >>> depends_on: >>> - jobmanager >>> command: taskmanager >>> scale: 1 >>> environment: >>> - | >>> FLINK_PROPERTIES= >>> jobmanager.rpc.address: jobmanager >>> taskmanager.numberOfTaskSlots: 2 >>> >>> client: >>> image: pyflink-client:latest >>> depends_on: >>> - jobmanager >>> command: /opt/flink/run.sh >>> environment: >>> - | >>> FLINK_PROPERTIES= >>> rest.address: jobmanager >>> rest.bind-address: jobmanager >>> scale: 1 >>> >>> my read_kafka.py: >>> from pyflink.table import (EnvironmentSettings, TableEnvironment) >>> >>> def process_table(): >>> env_settings = ( >>> EnvironmentSettings >>> .new_instance() >>> .in_streaming_mode() >>> .build() >>> ) >>> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) >>> >>> >>> t_env.execute_sql( >>> f""" >>> CREATE TABLE kafka_input_topic ( >>> action STRING >>> ) WITH ( >>> 'connector' = 'kafka', >>> 'topic' = 'input_topic', >>> 'properties.bootstrap.servers' = 'kafka:9092', >>> 'properties.group.id <http://properties.group.id/>' = 'test', >>> 'scan.startup.mode' = 'earliest-offset', >>> 'format' = 'json', >>> 'json.ignore-parse-errors' = 'true' >>> ) >>> """) >>> >>> t_env.execute_sql( >>> f""" >>> CREATE TABLE print_sink ( >>> action STRING >>> ) WITH ( >>> 'connector' = 'print' >>> ) >>> """) >>> >>> t_env.execute_sql(""" >>> INSERT INTO print_sink SELECT * FROM kafka_input_topic >>> """).print() >>> >>> if __name__ == "__main__": >>> process_table() >>> >>> >>> the run.sh is just one line to use `flink run`: >>> #!/bin/bash >>> flink run -py read_kafka.py >>> >>> my dockerfile to build pyflink:lastest image: >>> >>> FROM flink:latest >>> >>> # install python3 and pip3 >>> RUN apt-get update -y && \ >>> apt-get install -y python3 python3-pip python3-dev && rm -rf >>> /var/lib/apt/lists/* >>> RUN ln -s /usr/bin/python3 /usr/bin/python >>> >>> # install PyFlink >>> >>> COPY apache-flink*.tar.gz / >>> RUN pip3 install /apache-flink-libraries*.tar.gz && pip3 install >>> /apache-flink*.tar.gz >>> COPY flink-sql-connector-kafka-3.1.0-1.18.jar /opt/flink/lib >>> >>> my dockerfile to build pyflink-client:lastest image: >>> FROM pyflink:latest >>> COPY read_kafka.py /opt/flink/ >>> COPY run.sh /opt/flink >>> RUN chmod 777 /opt/flink/run.sh >>> >>> Possible output (dependent on the content of the kafka topic): >>> <image.png> >>> >>> Best, >>> Biao Geng >>> >>> Biao Geng <biaoge...@gmail.com <mailto:biaoge...@gmail.com>> 于2024年4月11日周四 >>> 09:53写道: >>>> Hi Phil, >>>> "should this be run in the Flink JobManager?” It should be fine to run in >>>> your client container. >>>> "examples of running flink run in a Dockerized application" This sql >>>> client example >>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#flink-sql-client-with-session-cluster> >>>> can be used as a good example. >>>> It is somehow strange to learn that the pipeline.jars does not work. I >>>> will try to run the pyflink using kafka example in docker locally later >>>> today. Besides that, I see that in your all 3 containers, the connector >>>> jar are all located in >>>> /opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar, could you please >>>> give a try of using `table_env.get_config().set("pipeline.classpaths", >>>> "file:///my/jar/path/connector.jar")` in your python script? >>>> >>>> Best, >>>> Biao Geng >>>> >>>> >>>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月11日周四 >>>> 00:49写道: >>>>> Hi Biao, >>>>> >>>>> I will check out running with flink run, but should this be run in the >>>>> Flink JobManager? Would that mean that the container for the Flink >>>>> JobManager would require both Python installed and a copy of the >>>>> flink_client.py module? Are there some examples of running flink run in a >>>>> Dockerized application instead of the local CLI? >>>>> I have added the pipeline.jars in the flink_client module but it is still >>>>> complaining about not finding kafka and the DynamicTable factory class in >>>>> the class path. >>>>> Thanks for your feedback. >>>>> >>>>> Kind regards >>>>> Phil >>>>> >>>>>> On 10 Apr 2024, at 16:21, Biao Geng <biaoge...@gmail.com >>>>>> <mailto:biaoge...@gmail.com>> wrote: >>>>>> >>>>>> Hi Phil, >>>>>> It should be totally ok to use `python -m flink_client.job`. It just >>>>>> seems to me that the flink cli is being used more often. >>>>>> And yes, you also need to add the sql connector jar to the flink_client >>>>>> container. After putting the jar in your client container, add codes >>>>>> like `table_env.get_config().set("pipeline.jars", >>>>>> "file:///path/in/container/to/connector.jar")` to your job.py. Then in >>>>>> the client side, the flink can see the connector jar and the jar would >>>>>> be uploaded to the cluster as well. See the doc of Jar Dependencies >>>>>> Management >>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/dependency_management/> >>>>>> for more details. >>>>>> >>>>>> Best, >>>>>> Biao Geng >>>>>> >>>>>> >>>>>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> >>>>>> 于2024年4月10日周三 22:04写道: >>>>>>> Hi Biao, >>>>>>> >>>>>>> 1. I have a Flink client container like this: >>>>>>> # Flink client >>>>>>> flink_client: >>>>>>> container_name: flink_client >>>>>>> image: flink-client:local >>>>>>> build: >>>>>>> context: . >>>>>>> dockerfile: flink_client/Dockerfile >>>>>>> networks: >>>>>>> - standard >>>>>>> depends_on: >>>>>>> - jobmanager >>>>>>> - Kafka >>>>>>> >>>>>>> The flink_client/Dockerfile has this bash file which only runs the >>>>>>> Flink client, when the Flink cluster is up an running: >>>>>>> Dockerfile: >>>>>>> ENTRYPOINT [“./run_when_ready.sh”] >>>>>>> >>>>>>> run_when_ready.sh: >>>>>>> >>>>>>> #!/bin/bash >>>>>>> >>>>>>> wait_for_jobmanager() { >>>>>>> echo "Waiting for Flink JobManager to be ready..." >>>>>>> while ! nc -z jobmanager 8081; do >>>>>>> sleep 1 >>>>>>> done >>>>>>> echo "Flink JobManager is ready." >>>>>>> } >>>>>>> >>>>>>> # Call the function >>>>>>> wait_for_jobmanager >>>>>>> >>>>>>> # Now run the PyFlink job >>>>>>> python -m flink_client.job >>>>>>> flink_client.job.py <http://flink_client.job.py/> is the file I >>>>>>> attached earlier which has: >>>>>>> >>>>>>>>>>> if __name__ == "__main__": >>>>>>>>>>> process_table() >>>>>>> >>>>>>> >>>>>>> So it runs that file which should submit the Table job to the Flink >>>>>>> cluster as that file runs: >>>>>>>>>>> t_env.execute_sql(""" >>>>>>>>>>> SELECT COUNT(*) AS message_count >>>>>>>>>>> FROM kafka_test_logs >>>>>>>>>>> """).print() >>>>>>> >>>>>>> >>>>>>> I don’t use flink run -t … >>>>>>> Is this the wrong way to run PyFlink jobs? >>>>>>> >>>>>>> Also, do I need to also add the flink-sql-connector.jar to the >>>>>>> flink_client/Dockerfile, i.e. COPY /libs/*.jar /opt/flink/lib/ and >>>>>>> instead of relying on the environment.execute_sql(…), I need to run it >>>>>>> with flink_run -t and the JAR file as you mentioned earlier? >>>>>>> >>>>>>> >>>>>>>> On 10 Apr 2024, at 11:37, Biao Geng <biaoge...@gmail.com >>>>>>>> <mailto:biaoge...@gmail.com>> wrote: >>>>>>>> >>>>>>>> Hi Phil, >>>>>>>> >>>>>>>> Your codes look good. I mean how do you run the python script. Maybe >>>>>>>> you are using flink cli? i.e. run commands like ` flink run -t .. -py >>>>>>>> job.py -j /path/to/flink-sql-kafka-connector.jar`. If that's the case, >>>>>>>> the `-j /path/to/flink-sql-kafka-connector.jar` is necessary so that >>>>>>>> in client side, flink can generate the job graph successfully. >>>>>>>> >>>>>>>> As for the second question, in your case, yes, your client’s env need >>>>>>>> to have the flink-sql-kafka-connector JAR. You can check the doc >>>>>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#deployment-modes> >>>>>>>> for more details. In short, your yaml shows that you are using the >>>>>>>> session mode, which needs connector jar to generate job graph in the >>>>>>>> client side. >>>>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> Biao Geng >>>>>>>> >>>>>>>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> >>>>>>>> 于2024年4月10日周三 18:14写道: >>>>>>>>> Hi Biao, >>>>>>>>> >>>>>>>>> For submitting the job, I run t_env.execute_sql. >>>>>>>>> Shouldn’t that be sufficient for submitting the job using the Table >>>>>>>>> API with PyFlink? Isn’t that the recommended way for submitting and >>>>>>>>> running PyFlink jobs on a running Flink cluster? The Flink cluster >>>>>>>>> runs without issues, but there is no Running Job when the container >>>>>>>>> flink_app, that has the client code, runs. >>>>>>>>> I could submit a job, collect, using the Flink SQL console, and it >>>>>>>>> was available on the Running Jobs tab, but that was also not >>>>>>>>> ingesting data from the Kafka topic. >>>>>>>>> Does my client’s env need to have the flink-sql-kafka-connector JAR? >>>>>>>>> Is that needed just in the class path of the Flink JobManager and >>>>>>>>> TaskManager? >>>>>>>>> >>>>>>>>> Kind regards >>>>>>>>> Phil >>>>>>>>> >>>>>>>>> >>>>>>>>>> On 10 Apr 2024, at 03:33, Biao Geng <biaoge...@gmail.com >>>>>>>>>> <mailto:biaoge...@gmail.com>> wrote: >>>>>>>>>> >>>>>>>>>> Hi Phil, >>>>>>>>>> >>>>>>>>>> Thanks for sharing the detailed information of the job. For you >>>>>>>>>> question, how to you submit the job? After applying your yaml file, >>>>>>>>>> I think you will successfully launch a flink cluster with 1 JM and 1 >>>>>>>>>> TM. Then you would submit the pyflink job to the flink cluster. As >>>>>>>>>> the error you showed is the client-side error, it is possible that >>>>>>>>>> your client's env does not contain the flink-sql-kafka-connector jar >>>>>>>>>> which may lead to the exception. >>>>>>>>>> By the way, the "Table API" codes in your mail is actually using the >>>>>>>>>> flink SQL API, so the flink-sql-kafka-connector jar is required, >>>>>>>>>> which is exactly what you have prepared. For pyflink's table API, >>>>>>>>>> you can have a look at this document: >>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Biao Geng >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> >>>>>>>>>> 于2024年4月10日周三 03:10写道: >>>>>>>>>>> Hello, >>>>>>>>>>> >>>>>>>>>>> I have set up Flink and Kafka containers using docker-compose, for >>>>>>>>>>> testing how Flink works for processing Kafka messages. >>>>>>>>>>> I primarily want to check how the Table API works but also how the >>>>>>>>>>> Stream API would process the Kafka messages. >>>>>>>>>>> >>>>>>>>>>> I have included the main part of the docker-compose.yaml file I am >>>>>>>>>>> using for the Flink cluster. I have commented out some of the JAR >>>>>>>>>>> files I have tried out but I have mainly used the >>>>>>>>>>> flink-sql-connector-kafka JAR. I would like to confirm if I am >>>>>>>>>>> using any wrong configurations, which JARs should I be using for >>>>>>>>>>> each API and if just need to use one JAR for both Table and >>>>>>>>>>> Datastream APIs? >>>>>>>>>>> I have also included the Flink client module I have used for both >>>>>>>>>>> the Table and the Datastream APIs and the error messages. >>>>>>>>>>> Any idea what is missing or if there is any configuration that >>>>>>>>>>> seems wrong? >>>>>>>>>>> >>>>>>>>>>> docker-compose.yml >>>>>>>>>>> flink_jobmanager: >>>>>>>>>>> image: flink:1.18.1-scala_2.12 >>>>>>>>>>> container_name: flink_jobmanager >>>>>>>>>>> ports: >>>>>>>>>>> - "8081:8081" >>>>>>>>>>> command: jobmanager >>>>>>>>>>> volumes: >>>>>>>>>>> - >>>>>>>>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar >>>>>>>>>>> # - >>>>>>>>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar >>>>>>>>>>> # - >>>>>>>>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar >>>>>>>>>>> # - >>>>>>>>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar >>>>>>>>>>> environment: >>>>>>>>>>> - | >>>>>>>>>>> FLINK_PROPERTIES= >>>>>>>>>>> jobmanager.rpc.address: jobmanager >>>>>>>>>>> networks: >>>>>>>>>>> - standard >>>>>>>>>>> depends_on: >>>>>>>>>>> - kafka >>>>>>>>>>> >>>>>>>>>>> flink_taskmanager: >>>>>>>>>>> image: flink:1.18.1-scala_2.12 >>>>>>>>>>> container_name: flink_taskmanager >>>>>>>>>>> volumes: >>>>>>>>>>> - >>>>>>>>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar >>>>>>>>>>> # - >>>>>>>>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar >>>>>>>>>>> # - >>>>>>>>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar >>>>>>>>>>> # - >>>>>>>>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar >>>>>>>>>>> depends_on: >>>>>>>>>>> - kafka >>>>>>>>>>> - jobmanager >>>>>>>>>>> command: taskmanager >>>>>>>>>>> environment: >>>>>>>>>>> - | >>>>>>>>>>> FLINK_PROPERTIES= >>>>>>>>>>> jobmanager.rpc.address: jobmanager >>>>>>>>>>> taskmanager.numberOfTaskSlots: 2 >>>>>>>>>>> networks: >>>>>>>>>>> - standard >>>>>>>>>>> Table API: >>>>>>>>>>> def process_table(): >>>>>>>>>>> >>>>>>>>>>> print('Running the Table job now.’) >>>>>>>>>>> env_settings = ( >>>>>>>>>>> EnvironmentSettings >>>>>>>>>>> .new_instance() >>>>>>>>>>> .in_streaming_mode() >>>>>>>>>>> # .with_configuration(config) >>>>>>>>>>> .build() >>>>>>>>>>> ) >>>>>>>>>>> t_env = TableEnvironment.create(env_settings) >>>>>>>>>>> >>>>>>>>>>> t_env.execute_sql( >>>>>>>>>>> f""" >>>>>>>>>>> CREATE TABLE kafka_test_logs ( >>>>>>>>>>> action2 STRING >>>>>>>>>>> ) WITH ( >>>>>>>>>>> 'connector' = 'kafka', >>>>>>>>>>> 'topic' = 'test_logs', >>>>>>>>>>> 'properties.bootstrap.servers' = 'kafka:9092', >>>>>>>>>>> 'properties.group.id <http://properties.group.id/>' = 'flink_group', >>>>>>>>>>> 'scan.startup.mode' = 'earliest-offset', >>>>>>>>>>> 'format' = 'json', >>>>>>>>>>> 'json.ignore-parse-errors' = 'true' >>>>>>>>>>> ) >>>>>>>>>>> """) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> t_env.execute_sql(""" >>>>>>>>>>> SELECT COUNT(*) AS message_count >>>>>>>>>>> FROM kafka_test_logs >>>>>>>>>>> """).print() >>>>>>>>>>> >>>>>>>>>>> print('Table job has now completed.') >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> if __name__ == "__main__": >>>>>>>>>>> process_table() >>>>>>>>>>> error: >>>>>>>>>>> link_app | Caused by: >>>>>>>>>>> org.apache.flink.table.api.ValidationException: Could not find any >>>>>>>>>>> factory for identifier 'kafka' that implements >>>>>>>>>>> 'org.apache.flink.table.factories.DynamicTableFactory' in the >>>>>>>>>>> classpath. >>>>>>>>>>> flink_app | >>>>>>>>>>> flink_app | Available factory identifiers are: >>>>>>>>>>> flink_app | >>>>>>>>>>> flink_app | blackhole >>>>>>>>>>> flink_app | datagen >>>>>>>>>>> flink_app | filesystem >>>>>>>>>>> flink_app | print >>>>>>>>>>> flink_app | python-input-format >>>>>>>>>>> The '/opt/flink/lib' has the JARs from both the image and the JARs >>>>>>>>>>> i added in the docker-compose.yml file. >>>>>>>>>>> >>>>>>>>>>> Stream API: >>>>>>>>>>> def process_stream(): >>>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment() >>>>>>>>>>> >>>>>>>>>>> properties = { >>>>>>>>>>> "bootstrap.servers": 'kafka:9092', >>>>>>>>>>> "group.id <http://group.id/>": "test_group" >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> kafka_consumer = FlinkKafkaConsumer( >>>>>>>>>>> topics='test_logs', >>>>>>>>>>> deserialization_schema=SimpleStringSchema(), >>>>>>>>>>> properties=properties) >>>>>>>>>>> >>>>>>>>>>> data_stream = env.add_source(kafka_consumer) >>>>>>>>>>> >>>>>>>>>>> parsed_stream = data_stream.map(lambda x: json.loads(x), >>>>>>>>>>> output_type=Types.ROW([Types.STRING()])) >>>>>>>>>>> parsed_stream.print() >>>>>>>>>>> >>>>>>>>>>> env.execute("Kafka DataStream Test”) >>>>>>>>>>> >>>>>>>>>>> if __name__ == "__main__": >>>>>>>>>>> process_stream() >>>>>>>>>>> error: >>>>>>>>>>> TypeError: Could not found the Java class >>>>>>>>>>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. >>>>>>>>>>> The Java dependencies could be specified via command line argument >>>>>>>>>>> '--jarfile' or the config option 'pipeline.jars' >>>>>>>>>>> >>>>>>>>>>> I have used the JAR dependencies based on: >>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis >>>>>>>>>>> >>>>>>>>>>> Kind regards >>>>>>>>>>> Phil >>>>>>>>> >>>>>>> >>>>> >>