Thanks Biao.

Kind regards
Phil

> On 14 Apr 2024, at 18:04, Biao Geng <biaoge...@gmail.com> wrote:
> 
> Hi Phil,
> 
> You can check my github link 
> <https://github.com/bgeng777/pyflink-learning/tree/main/pyflink-using-docker> 
> for a detailed tutorial and example codes :).
> 
> Best,
> Biao Geng
> 
> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月12日周五 
> 19:10写道:
>> Hi Biao,
>> 
>> Thanks for looking into it and providing a detailed example.
>> I am not sure I am following some of the setup and I just want to make sure 
>> I run what you have tested without messing up anything with the setup. 
>> Especially for how you create the Docker images etc. Do you have this 
>> committed somewhere or could you perhaps provide a JAR file with the 
>> implementation? Thanks.
>> 
>>> On 11 Apr 2024, at 19:43, Biao Geng <biaoge...@gmail.com 
>>> <mailto:biaoge...@gmail.com>> wrote:
>>> 
>>> Hi Phil,
>>> I test it on my local docker environment and find that we do need to use 
>>> "flink run" to submit the job to the session cluster. Simply using `python 
>>> xx.py` may just launch a local mini cluster and would not submit the job to 
>>> the cluster you created. Also note, that all required dependencies (e.g. 
>>> the kafka connector) need to be available in the cluster as well as the 
>>> client.
>>> 
>>> Here is my test codes which work in my env, hope this helps:
>>> my flink-cluster-and-client.yaml:
>>> version: "2.2"
>>> services:
>>>   jobmanager:
>>>     image: pyflink:latest
>>>     ports:
>>>       - "8081:8081"
>>>     command: jobmanager
>>>     environment:
>>>       - |
>>>         FLINK_PROPERTIES=
>>>         jobmanager.rpc.address: jobmanager        
>>> 
>>>   taskmanager:
>>>     image: pyflink:latest
>>>     depends_on:
>>>       - jobmanager
>>>     command: taskmanager
>>>     scale: 1
>>>     environment:
>>>       - |
>>>         FLINK_PROPERTIES=
>>>         jobmanager.rpc.address: jobmanager
>>>         taskmanager.numberOfTaskSlots: 2        
>>> 
>>>   client:
>>>     image: pyflink-client:latest
>>>     depends_on:
>>>       - jobmanager
>>>     command: /opt/flink/run.sh
>>>     environment:
>>>       - |
>>>         FLINK_PROPERTIES=
>>>         rest.address: jobmanager
>>>         rest.bind-address: jobmanager
>>>     scale: 1
>>> 
>>> my read_kafka.py:
>>> from pyflink.table import (EnvironmentSettings, TableEnvironment)
>>> 
>>> def process_table():
>>>     env_settings = (
>>>         EnvironmentSettings
>>>         .new_instance()
>>>         .in_streaming_mode()
>>>         .build()
>>>     )
>>>     t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>> 
>>> 
>>>     t_env.execute_sql(
>>>         f"""
>>>     CREATE TABLE kafka_input_topic (
>>>     action STRING
>>>     ) WITH (
>>>     'connector' = 'kafka',
>>>     'topic' = 'input_topic',
>>>     'properties.bootstrap.servers' = 'kafka:9092',
>>>     'properties.group.id <http://properties.group.id/>' = 'test',
>>>     'scan.startup.mode' = 'earliest-offset',
>>>     'format' = 'json',
>>>     'json.ignore-parse-errors' = 'true'
>>>     )
>>>     """)
>>>     
>>>     t_env.execute_sql(
>>>         f"""
>>>     CREATE TABLE print_sink (
>>>     action STRING
>>>     ) WITH (
>>>     'connector' = 'print'
>>>     )
>>>     """)
>>> 
>>>     t_env.execute_sql("""
>>>     INSERT INTO print_sink SELECT * FROM kafka_input_topic
>>>     """).print()
>>> 
>>> if __name__ == "__main__":
>>>     process_table()
>>> 
>>> 
>>> the run.sh is just one line to use `flink run`:
>>> #!/bin/bash
>>> flink run -py read_kafka.py
>>> 
>>> my dockerfile to build pyflink:lastest image:
>>> 
>>> FROM flink:latest
>>> 
>>> # install python3 and pip3
>>> RUN apt-get update -y && \
>>> apt-get install -y python3 python3-pip python3-dev && rm -rf 
>>> /var/lib/apt/lists/*
>>> RUN ln -s /usr/bin/python3 /usr/bin/python
>>> 
>>> # install PyFlink
>>> 
>>> COPY apache-flink*.tar.gz /
>>> RUN pip3 install /apache-flink-libraries*.tar.gz && pip3 install 
>>> /apache-flink*.tar.gz
>>> COPY flink-sql-connector-kafka-3.1.0-1.18.jar /opt/flink/lib
>>> 
>>> my dockerfile to build pyflink-client:lastest image:
>>> FROM pyflink:latest
>>> COPY read_kafka.py /opt/flink/
>>> COPY run.sh /opt/flink
>>> RUN chmod 777 /opt/flink/run.sh
>>> 
>>> Possible output (dependent on the content of the kafka topic):
>>> <image.png>
>>> 
>>> Best,
>>> Biao Geng
>>> 
>>> Biao Geng <biaoge...@gmail.com <mailto:biaoge...@gmail.com>> 于2024年4月11日周四 
>>> 09:53写道:
>>>> Hi Phil,
>>>> "should this be run in the Flink JobManager?” It should be fine to run in 
>>>> your client container. 
>>>> "examples of running flink run in a Dockerized application" This sql 
>>>> client example 
>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#flink-sql-client-with-session-cluster>
>>>>  can be used as a good example.
>>>> It is somehow strange to learn that the pipeline.jars does not work. I 
>>>> will try to run the pyflink using kafka example in docker locally later 
>>>> today. Besides that, I see that in your all 3 containers, the connector 
>>>> jar are all located in 
>>>> /opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar, could you please 
>>>> give a try of using `table_env.get_config().set("pipeline.classpaths", 
>>>> "file:///my/jar/path/connector.jar")` in your python script?
>>>> 
>>>> Best,
>>>> Biao Geng
>>>> 
>>>> 
>>>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月11日周四 
>>>> 00:49写道:
>>>>> Hi Biao,
>>>>> 
>>>>> I will check out running with flink run, but should this be run in the 
>>>>> Flink JobManager? Would that mean that the container for the Flink 
>>>>> JobManager would require both Python installed and a copy of the 
>>>>> flink_client.py module? Are there some examples of running flink run in a 
>>>>> Dockerized application instead of the local CLI?
>>>>> I have added the pipeline.jars in the flink_client module but it is still 
>>>>> complaining about not finding kafka and the DynamicTable factory class in 
>>>>> the class path. 
>>>>> Thanks for your feedback.
>>>>> 
>>>>> Kind regards
>>>>> Phil
>>>>> 
>>>>>> On 10 Apr 2024, at 16:21, Biao Geng <biaoge...@gmail.com 
>>>>>> <mailto:biaoge...@gmail.com>> wrote:
>>>>>> 
>>>>>> Hi Phil,
>>>>>> It should be totally ok to use `python -m flink_client.job`. It just 
>>>>>> seems to me that the flink cli is being used more often.
>>>>>> And yes, you also need to add the sql connector jar to the flink_client 
>>>>>> container. After putting the jar in your client container, add codes 
>>>>>> like `table_env.get_config().set("pipeline.jars", 
>>>>>> "file:///path/in/container/to/connector.jar")` to your job.py. Then in 
>>>>>> the client side, the flink can see the connector jar and the jar would 
>>>>>> be uploaded to the cluster as well. See the doc of Jar Dependencies 
>>>>>> Management 
>>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/dependency_management/>
>>>>>>  for more details.
>>>>>> 
>>>>>> Best,
>>>>>> Biao Geng
>>>>>> 
>>>>>> 
>>>>>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 
>>>>>> 于2024年4月10日周三 22:04写道:
>>>>>>> Hi Biao,
>>>>>>> 
>>>>>>> 1. I have a Flink client container like this:
>>>>>>> # Flink client
>>>>>>> flink_client:
>>>>>>> container_name: flink_client
>>>>>>> image: flink-client:local
>>>>>>> build:
>>>>>>> context: .
>>>>>>> dockerfile: flink_client/Dockerfile
>>>>>>> networks:
>>>>>>> - standard
>>>>>>> depends_on:
>>>>>>> - jobmanager
>>>>>>> - Kafka
>>>>>>> 
>>>>>>> The flink_client/Dockerfile has this bash file which only runs the 
>>>>>>> Flink client, when the Flink cluster is up an running:
>>>>>>> Dockerfile:
>>>>>>> ENTRYPOINT [“./run_when_ready.sh”]
>>>>>>> 
>>>>>>> run_when_ready.sh:
>>>>>>> 
>>>>>>> #!/bin/bash
>>>>>>> 
>>>>>>> wait_for_jobmanager() {
>>>>>>> echo "Waiting for Flink JobManager to be ready..."
>>>>>>> while ! nc -z jobmanager 8081; do
>>>>>>> sleep 1
>>>>>>> done
>>>>>>> echo "Flink JobManager is ready."
>>>>>>> }
>>>>>>> 
>>>>>>> # Call the function
>>>>>>> wait_for_jobmanager
>>>>>>> 
>>>>>>> # Now run the PyFlink job
>>>>>>> python -m flink_client.job
>>>>>>> flink_client.job.py <http://flink_client.job.py/> is the file I 
>>>>>>> attached earlier which has:
>>>>>>> 
>>>>>>>>>>> if __name__ == "__main__":
>>>>>>>>>>>     process_table()
>>>>>>> 
>>>>>>> 
>>>>>>> So it runs that file which should submit the Table job to the Flink 
>>>>>>> cluster as that file runs:
>>>>>>>>>>> t_env.execute_sql("""
>>>>>>>>>>> SELECT COUNT(*) AS message_count
>>>>>>>>>>> FROM kafka_test_logs
>>>>>>>>>>> """).print()
>>>>>>> 
>>>>>>> 
>>>>>>> I don’t use flink run -t …
>>>>>>> Is this the wrong way to run PyFlink jobs?
>>>>>>> 
>>>>>>> Also, do I need to also add the flink-sql-connector.jar to the 
>>>>>>> flink_client/Dockerfile, i.e. COPY /libs/*.jar /opt/flink/lib/ and 
>>>>>>> instead of relying on the environment.execute_sql(…), I need to run it 
>>>>>>> with flink_run -t and the JAR file as you mentioned earlier?
>>>>>>> 
>>>>>>> 
>>>>>>>> On 10 Apr 2024, at 11:37, Biao Geng <biaoge...@gmail.com 
>>>>>>>> <mailto:biaoge...@gmail.com>> wrote:
>>>>>>>> 
>>>>>>>> Hi Phil,
>>>>>>>> 
>>>>>>>> Your codes look good. I mean how do you run the python script. Maybe 
>>>>>>>> you are using flink cli? i.e. run commands like ` flink run -t .. -py 
>>>>>>>> job.py -j /path/to/flink-sql-kafka-connector.jar`. If that's the case, 
>>>>>>>> the `-j /path/to/flink-sql-kafka-connector.jar` is necessary so that 
>>>>>>>> in client side, flink can generate the job graph successfully.
>>>>>>>> 
>>>>>>>> As for the second question, in your case, yes, your client’s env need 
>>>>>>>> to have the flink-sql-kafka-connector JAR. You can check the doc 
>>>>>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#deployment-modes>
>>>>>>>>  for more details. In short, your yaml shows that you are using the 
>>>>>>>> session mode, which needs connector jar to generate job graph in the 
>>>>>>>> client side.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Biao Geng
>>>>>>>> 
>>>>>>>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 
>>>>>>>> 于2024年4月10日周三 18:14写道:
>>>>>>>>> Hi Biao,
>>>>>>>>> 
>>>>>>>>> For submitting the job, I run t_env.execute_sql.
>>>>>>>>> Shouldn’t that be sufficient for submitting the job using the Table 
>>>>>>>>> API with PyFlink? Isn’t that the recommended way for submitting and 
>>>>>>>>> running PyFlink jobs on a running Flink cluster? The Flink cluster 
>>>>>>>>> runs without issues, but there is no Running Job when the container 
>>>>>>>>> flink_app, that has the client code, runs. 
>>>>>>>>> I could submit a job, collect, using the Flink SQL console, and it 
>>>>>>>>> was available on the Running Jobs tab, but that was also not 
>>>>>>>>> ingesting data from the Kafka topic.
>>>>>>>>> Does my client’s env need to have the flink-sql-kafka-connector JAR? 
>>>>>>>>> Is that needed just in the class path of the Flink JobManager and 
>>>>>>>>> TaskManager?
>>>>>>>>> 
>>>>>>>>> Kind regards
>>>>>>>>> Phil
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On 10 Apr 2024, at 03:33, Biao Geng <biaoge...@gmail.com 
>>>>>>>>>> <mailto:biaoge...@gmail.com>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi Phil,
>>>>>>>>>> 
>>>>>>>>>> Thanks for sharing the detailed information of the job. For you 
>>>>>>>>>> question, how to you submit the job? After applying your yaml file, 
>>>>>>>>>> I think you will successfully launch a flink cluster with 1 JM and 1 
>>>>>>>>>> TM. Then you would submit the pyflink job to the flink cluster. As 
>>>>>>>>>> the error you showed is the client-side error, it is possible that 
>>>>>>>>>> your client's env does not contain the flink-sql-kafka-connector jar 
>>>>>>>>>> which may lead to the exception.
>>>>>>>>>> By the way, the "Table API" codes in your mail is actually using the 
>>>>>>>>>> flink SQL API, so the flink-sql-kafka-connector jar is required, 
>>>>>>>>>> which is exactly what you have prepared. For pyflink's table API, 
>>>>>>>>>> you can have a look at this document: 
>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Biao Geng
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 
>>>>>>>>>> 于2024年4月10日周三 03:10写道:
>>>>>>>>>>> Hello,
>>>>>>>>>>> 
>>>>>>>>>>> I have set up Flink and Kafka containers using docker-compose, for 
>>>>>>>>>>> testing how Flink works for processing Kafka messages.
>>>>>>>>>>> I primarily want to check how the Table API works but also how the 
>>>>>>>>>>> Stream API would process the Kafka messages.
>>>>>>>>>>> 
>>>>>>>>>>> I have included the main part of the docker-compose.yaml file I am 
>>>>>>>>>>> using for the Flink cluster. I have commented out some of the JAR 
>>>>>>>>>>> files I have tried out but I have mainly used the 
>>>>>>>>>>> flink-sql-connector-kafka JAR. I would like to confirm if I am 
>>>>>>>>>>> using any wrong configurations, which JARs should I be using for 
>>>>>>>>>>> each API and if just need to use one JAR for both Table and 
>>>>>>>>>>> Datastream APIs?
>>>>>>>>>>> I have also included the Flink client module I have used for both 
>>>>>>>>>>> the Table and the Datastream APIs and the error messages.
>>>>>>>>>>> Any idea what is missing or if there is any configuration that 
>>>>>>>>>>> seems wrong?
>>>>>>>>>>> 
>>>>>>>>>>> docker-compose.yml
>>>>>>>>>>> flink_jobmanager:
>>>>>>>>>>> image: flink:1.18.1-scala_2.12
>>>>>>>>>>> container_name: flink_jobmanager
>>>>>>>>>>> ports:
>>>>>>>>>>> - "8081:8081"
>>>>>>>>>>> command: jobmanager
>>>>>>>>>>> volumes:
>>>>>>>>>>> - 
>>>>>>>>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>>>>>>>>>>> # - 
>>>>>>>>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>>>>>>>>>>> # - 
>>>>>>>>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>>>>>>>>>>> # - 
>>>>>>>>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>>>>>>>>>>> environment:
>>>>>>>>>>> - |
>>>>>>>>>>> FLINK_PROPERTIES=
>>>>>>>>>>> jobmanager.rpc.address: jobmanager
>>>>>>>>>>> networks:
>>>>>>>>>>> - standard
>>>>>>>>>>> depends_on:
>>>>>>>>>>> - kafka
>>>>>>>>>>> 
>>>>>>>>>>> flink_taskmanager:
>>>>>>>>>>> image: flink:1.18.1-scala_2.12
>>>>>>>>>>> container_name: flink_taskmanager
>>>>>>>>>>> volumes:
>>>>>>>>>>> - 
>>>>>>>>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>>>>>>>>>>> # - 
>>>>>>>>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>>>>>>>>>>> # - 
>>>>>>>>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>>>>>>>>>>> # - 
>>>>>>>>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>>>>>>>>>>> depends_on:
>>>>>>>>>>> - kafka
>>>>>>>>>>> - jobmanager
>>>>>>>>>>> command: taskmanager
>>>>>>>>>>> environment:
>>>>>>>>>>> - |
>>>>>>>>>>> FLINK_PROPERTIES=
>>>>>>>>>>> jobmanager.rpc.address: jobmanager
>>>>>>>>>>> taskmanager.numberOfTaskSlots: 2
>>>>>>>>>>> networks:
>>>>>>>>>>> - standard
>>>>>>>>>>> Table API:
>>>>>>>>>>> def process_table():
>>>>>>>>>>> 
>>>>>>>>>>> print('Running the Table job now.’) 
>>>>>>>>>>> env_settings = (
>>>>>>>>>>> EnvironmentSettings
>>>>>>>>>>> .new_instance()
>>>>>>>>>>> .in_streaming_mode()
>>>>>>>>>>> # .with_configuration(config)
>>>>>>>>>>> .build()
>>>>>>>>>>> )
>>>>>>>>>>> t_env = TableEnvironment.create(env_settings)
>>>>>>>>>>> 
>>>>>>>>>>> t_env.execute_sql(
>>>>>>>>>>> f"""
>>>>>>>>>>> CREATE TABLE kafka_test_logs (
>>>>>>>>>>> action2 STRING
>>>>>>>>>>> ) WITH (
>>>>>>>>>>> 'connector' = 'kafka',
>>>>>>>>>>> 'topic' = 'test_logs',
>>>>>>>>>>> 'properties.bootstrap.servers' = 'kafka:9092',
>>>>>>>>>>> 'properties.group.id <http://properties.group.id/>' = 'flink_group',
>>>>>>>>>>> 'scan.startup.mode' = 'earliest-offset',
>>>>>>>>>>> 'format' = 'json',
>>>>>>>>>>> 'json.ignore-parse-errors' = 'true'
>>>>>>>>>>> )
>>>>>>>>>>> """)
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> t_env.execute_sql("""
>>>>>>>>>>> SELECT COUNT(*) AS message_count
>>>>>>>>>>> FROM kafka_test_logs
>>>>>>>>>>> """).print()
>>>>>>>>>>> 
>>>>>>>>>>> print('Table job has now completed.')
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> if __name__ == "__main__":
>>>>>>>>>>>     process_table()
>>>>>>>>>>> error:
>>>>>>>>>>> link_app | Caused by: 
>>>>>>>>>>> org.apache.flink.table.api.ValidationException: Could not find any 
>>>>>>>>>>> factory for identifier 'kafka' that implements 
>>>>>>>>>>> 'org.apache.flink.table.factories.DynamicTableFactory' in the 
>>>>>>>>>>> classpath.
>>>>>>>>>>> flink_app |
>>>>>>>>>>> flink_app | Available factory identifiers are:
>>>>>>>>>>> flink_app |
>>>>>>>>>>> flink_app | blackhole
>>>>>>>>>>> flink_app | datagen
>>>>>>>>>>> flink_app | filesystem
>>>>>>>>>>> flink_app | print
>>>>>>>>>>> flink_app | python-input-format
>>>>>>>>>>> The '/opt/flink/lib' has the JARs from both the image and the JARs 
>>>>>>>>>>> i added in the docker-compose.yml file.
>>>>>>>>>>> 
>>>>>>>>>>> Stream API:
>>>>>>>>>>> def process_stream():
>>>>>>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>>>> 
>>>>>>>>>>> properties = {
>>>>>>>>>>> "bootstrap.servers": 'kafka:9092',
>>>>>>>>>>> "group.id <http://group.id/>": "test_group"
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> kafka_consumer = FlinkKafkaConsumer(
>>>>>>>>>>> topics='test_logs',
>>>>>>>>>>> deserialization_schema=SimpleStringSchema(),
>>>>>>>>>>> properties=properties)
>>>>>>>>>>> 
>>>>>>>>>>> data_stream = env.add_source(kafka_consumer)
>>>>>>>>>>> 
>>>>>>>>>>> parsed_stream = data_stream.map(lambda x: json.loads(x), 
>>>>>>>>>>> output_type=Types.ROW([Types.STRING()]))
>>>>>>>>>>> parsed_stream.print()
>>>>>>>>>>> 
>>>>>>>>>>> env.execute("Kafka DataStream Test”)
>>>>>>>>>>> 
>>>>>>>>>>> if __name__ == "__main__":
>>>>>>>>>>>     process_stream()
>>>>>>>>>>> error:
>>>>>>>>>>> TypeError: Could not found the Java class 
>>>>>>>>>>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. 
>>>>>>>>>>> The Java dependencies could be specified via command line argument 
>>>>>>>>>>> '--jarfile' or the config option 'pipeline.jars'
>>>>>>>>>>> 
>>>>>>>>>>> I have used the JAR dependencies based on:
>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis
>>>>>>>>>>> 
>>>>>>>>>>> Kind regards
>>>>>>>>>>> Phil
>>>>>>>>> 
>>>>>>> 
>>>>> 
>> 

Reply via email to