Hi Phil,

You can check my github link
for a detailed tutorial and example codes :).

Biao Geng

Phil Stavridis <phi...@gmail.com> 于2024年4月12日周五 19:10写道:

> Hi Biao,
> Thanks for looking into it and providing a detailed example.
> I am not sure I am following some of the setup and I just want to make
> sure I run what you have tested without messing up anything with the setup.
> Especially for how you create the Docker images etc. Do you have this
> committed somewhere or could you perhaps provide a JAR file with the
> implementation? Thanks.
> On 11 Apr 2024, at 19:43, Biao Geng <biaoge...@gmail.com> wrote:
> Hi Phil,
> I test it on my local docker environment and find that we do need to use
> "flink run" to submit the job to the session cluster. Simply using `python
> xx.py` may just launch a local mini cluster and would not submit the job to
> the cluster you created. Also note, that all required dependencies (e.g.
> the kafka connector) need to be available in the cluster as well as the
> client.
> Here is my test codes which work in my env, hope this helps:
> my flink-cluster-and-client.yaml:
> version: "2.2"
> services:
> jobmanager:
> image: pyflink:latest
> ports:
> - "8081:8081"
> command: jobmanager
> environment:
> - |
> jobmanager.rpc.address: jobmanager
> taskmanager:
> image: pyflink:latest
> depends_on:
> - jobmanager
> command: taskmanager
> scale: 1
> environment:
> - |
> jobmanager.rpc.address: jobmanager
> taskmanager.numberOfTaskSlots: 2
> client:
> image: pyflink-client:latest
> depends_on:
> - jobmanager
> command: /opt/flink/run.sh
> environment:
> - |
> rest.address: jobmanager
> rest.bind-address: jobmanager
> scale: 1
> my read_kafka.py:
> from pyflink.table import (EnvironmentSettings, TableEnvironment)
> def process_table():
> env_settings = (
> EnvironmentSettings
> .new_instance()
> .in_streaming_mode()
> .build()
> )
> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
> t_env.execute_sql(
> f"""
> CREATE TABLE kafka_input_topic (
> action STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'input_topic',
> 'properties.bootstrap.servers' = 'kafka:9092',
> 'properties.group.id' = 'test',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'json',
> 'json.ignore-parse-errors' = 'true'
> )
> """)
> t_env.execute_sql(
> f"""
> CREATE TABLE print_sink (
> action STRING
> ) WITH (
> 'connector' = 'print'
> )
> """)
> t_env.execute_sql("""
> INSERT INTO print_sink SELECT * FROM kafka_input_topic
> """).print()
> if __name__ == "__main__":
> process_table()
> the run.sh is just one line to use `flink run`:
> #!/bin/bash
> flink run -py read_kafka.py
> my dockerfile to build pyflink:lastest image:
> FROM flink:latest
> # install python3 and pip3
> RUN apt-get update -y && \
> apt-get install -y python3 python3-pip python3-dev && rm -rf
> /var/lib/apt/lists/*
> RUN ln -s /usr/bin/python3 /usr/bin/python
> # install PyFlink
> COPY apache-flink*.tar.gz /
> RUN pip3 install /apache-flink-libraries*.tar.gz && pip3 install
> /apache-flink*.tar.gz
> COPY flink-sql-connector-kafka-3.1.0-1.18.jar /opt/flink/lib
> my dockerfile to build pyflink-client:lastest image:
> FROM pyflink:latest
> COPY read_kafka.py /opt/flink/
> COPY run.sh /opt/flink
> RUN chmod 777 /opt/flink/run.sh
> Possible output (dependent on the content of the kafka topic):
> <image.png>
> Best,
> Biao Geng
> Biao Geng <biaoge...@gmail.com> 于2024年4月11日周四 09:53写道:
>> Hi Phil,
>> "should this be run in the Flink JobManager?” It should be fine to run in
>> your client container.
>> "examples of running flink run in a Dockerized application" This sql
>> client example
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#flink-sql-client-with-session-cluster>
>>  can
>> be used as a good example.
>> It is somehow strange to learn that the pipeline.jars does not work. I
>> will try to run the pyflink using kafka example in docker locally later
>> today. Besides that, I see that in your all 3 containers, the connector jar
>> are all located in /opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar,
>> could you please give a try of using `table_env.get_config().set(
>> "pipeline.classpaths", "file:///my/jar/path/connector.jar")` in your
>> python script?
>> Best,
>> Biao Geng
>> Phil Stavridis <phi...@gmail.com> 于2024年4月11日周四 00:49写道:
>>> Hi Biao,
>>> I will check out running with flink run, but should this be run in the
>>> Flink JobManager? Would that mean that the container for the Flink
>>> JobManager would require both Python installed and a copy of the
>>> flink_client.py module? Are there some examples of running flink run in a
>>> Dockerized application instead of the local CLI?
>>> I have added the pipeline.jars in the flink_client module but it is
>>> still complaining about not finding kafka and the DynamicTable factory
>>> class in the class path.
>>> Thanks for your feedback.
>>> Kind regards
>>> Phil
>>> On 10 Apr 2024, at 16:21, Biao Geng <biaoge...@gmail.com> wrote:
>>> Hi Phil,
>>> It should be totally ok to use `python -m flink_client.job`. It just
>>> seems to me that the flink cli is being used more often.
>>> And yes, you also need to add the sql connector jar to the flink_client
>>> container. After putting the jar in your client container, add codes
>>> like `table_env.get_config().set("pipeline.jars",
>>> "file:///path/in/container/to/connector.jar")` to your job.py. Then in the
>>> client side, the flink can see the connector jar and the jar would be
>>> uploaded to the cluster as well. See the doc of Jar Dependencies
>>> Management
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/dependency_management/>
>>>  for
>>> more details.
>>> Best,
>>> Biao Geng
>>> Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 22:04写道:
>>>> Hi Biao,
>>>> 1. I have a Flink client container like this:
>>>> # Flink client
>>>> flink_client:
>>>> container_name: flink_client
>>>> image: flink-client:local
>>>> build:
>>>> context: .
>>>> dockerfile: flink_client/Dockerfile
>>>> networks:
>>>> - standard
>>>> depends_on:
>>>> - jobmanager
>>>> - Kafka
>>>> The flink_client/Dockerfile has this bash file which only runs the
>>>> Flink client, when the Flink cluster is up an running:
>>>> Dockerfile:
>>>> ENTRYPOINT [“./run_when_ready.sh”]
>>>> run_when_ready.sh:
>>>> #!/bin/bash
>>>> wait_for_jobmanager() {
>>>> echo "Waiting for Flink JobManager to be ready..."
>>>> while ! nc -z jobmanager 8081; do
>>>> sleep 1
>>>> done
>>>> echo "Flink JobManager is ready."
>>>> }
>>>> # Call the function
>>>> wait_for_jobmanager
>>>> # Now run the PyFlink job
>>>> python -m flink_client.job
>>>> flink_client.job.py is the file I attached earlier which has:
>>>> if __name__ == "__main__":
>>>>>> process_table()
>>>> So it runs that file which should submit the Table job to the Flink
>>>> cluster as that file runs:
>>>> t_env.execute_sql("""
>>>>>> SELECT COUNT(*) AS message_count
>>>>>> FROM kafka_test_logs
>>>>>> """).print()
>>>> I don’t use flink run -t …
>>>> Is this the wrong way to run PyFlink jobs?
>>>> Also, do I need to also add the flink-sql-connector.jar to the
>>>> flink_client/Dockerfile, i.e. COPY /libs/*.jar /opt/flink/lib/ and instead
>>>> of relying on the environment.execute_sql(…), I need to run it with
>>>> flink_run -t and the JAR file as you mentioned earlier?
>>>> On 10 Apr 2024, at 11:37, Biao Geng <biaoge...@gmail.com> wrote:
>>>> Hi Phil,
>>>> Your codes look good. I mean how do you run the python script. Maybe
>>>> you are using flink cli? i.e. run commands like ` flink run -t .. -py
>>>> job.py -j /path/to/flink-sql-kafka-connector.jar`. If that's the case, the
>>>> `-j /path/to/flink-sql-kafka-connector.jar` is necessary so that in client
>>>> side, flink can generate the job graph successfully.
>>>> As for the second question, in your case, yes, your client’s env need
>>>> to have the flink-sql-kafka-connector JAR. You can check the doc
>>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#deployment-modes>
>>>>  for
>>>> more details. In short, your yaml shows that you are using the session
>>>> mode, which needs connector jar to generate job graph in the client side.
>>>> Best,
>>>> Biao Geng
>>>> Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 18:14写道:
>>>>> Hi Biao,
>>>>> For submitting the job, I run t_env.execute_sql.
>>>>> Shouldn’t that be sufficient for submitting the job using the Table
>>>>> API with PyFlink? Isn’t that the recommended way for submitting and 
>>>>> running
>>>>> PyFlink jobs on a running Flink cluster? The Flink cluster runs without
>>>>> issues, but there is no Running Job when the container flink_app, that has
>>>>> the client code, runs.
>>>>> I could submit a job, collect, using the Flink SQL console, and it was
>>>>> available on the Running Jobs tab, but that was also not ingesting data
>>>>> from the Kafka topic.
>>>>> Does my client’s env need to have the flink-sql-kafka-connector JAR?
>>>>> Is that needed just in the class path of the Flink JobManager and
>>>>> TaskManager?
>>>>> Kind regards
>>>>> Phil
>>>>> On 10 Apr 2024, at 03:33, Biao Geng <biaoge...@gmail.com> wrote:
>>>>> Hi Phil,
>>>>> Thanks for sharing the detailed information of the job. For you
>>>>> question, how to you submit the job? After applying your yaml file, I 
>>>>> think
>>>>> you will successfully launch a flink cluster with 1 JM and 1 TM. Then you
>>>>> would submit the pyflink job to the flink cluster. As the error you showed
>>>>> is the client-side error, it is possible that your client's env does not
>>>>> contain the flink-sql-kafka-connector jar which may lead to the exception.
>>>>> By the way, the "Table API" codes in your mail is actually using the
>>>>> flink SQL API, so the flink-sql-kafka-connector jar is required, which is
>>>>> exactly what you have prepared. For pyflink's table API, you can have a
>>>>> look at this document:
>>>>> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html
>>>>> Best,
>>>>> Biao Geng
>>>>> Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 03:10写道:
>>>>>> Hello,
>>>>>> I have set up Flink and Kafka containers using docker-compose, for
>>>>>> testing how Flink works for processing Kafka messages.
>>>>>> I primarily want to check how the Table API works but also how the
>>>>>> Stream API would process the Kafka messages.
>>>>>> I have included the main part of the docker-compose.yaml file I am
>>>>>> using for the Flink cluster. I have commented out some of the JAR files I
>>>>>> have tried out but I have mainly used the flink-sql-connector-kafka JAR. 
>>>>>> I
>>>>>> would like to confirm if I am using any wrong configurations, which JARs
>>>>>> should I be using for each API and if just need to use one JAR for both
>>>>>> Table and Datastream APIs?
>>>>>> I have also included the Flink client module I have used for both the
>>>>>> Table and the Datastream APIs and the error messages.
>>>>>> Any idea what is missing or if there is any configuration that seems
>>>>>> wrong?
>>>>>> docker-compose.yml
>>>>>> flink_jobmanager:
>>>>>> image: flink:1.18.1-scala_2.12
>>>>>> container_name: flink_jobmanager
>>>>>> ports:
>>>>>> - "8081:8081"
>>>>>> command: jobmanager
>>>>>> volumes:
>>>>>> -
>>>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>>>>>> # -
>>>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>>>>>> # -
>>>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>>>>>> # -
>>>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>>>>>> environment:
>>>>>> - |
>>>>>> jobmanager.rpc.address: jobmanager
>>>>>> networks:
>>>>>> - standard
>>>>>> depends_on:
>>>>>> - kafka
>>>>>> flink_taskmanager:
>>>>>> image: flink:1.18.1-scala_2.12
>>>>>> container_name: flink_taskmanager
>>>>>> volumes:
>>>>>> -
>>>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>>>>>> # -
>>>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>>>>>> # -
>>>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>>>>>> # -
>>>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>>>>>> depends_on:
>>>>>> - kafka
>>>>>> - jobmanager
>>>>>> command: taskmanager
>>>>>> environment:
>>>>>> - |
>>>>>> jobmanager.rpc.address: jobmanager
>>>>>> taskmanager.numberOfTaskSlots: 2
>>>>>> networks:
>>>>>> - standard
>>>>>> Table API:
>>>>>> def process_table():
>>>>>> print('Running the Table job now.’)
>>>>>> env_settings = (
>>>>>> EnvironmentSettings
>>>>>> .new_instance()
>>>>>> .in_streaming_mode()
>>>>>> # .with_configuration(config)
>>>>>> .build()
>>>>>> )
>>>>>> t_env = TableEnvironment.create(env_settings)
>>>>>> t_env.execute_sql(
>>>>>> f"""
>>>>>> CREATE TABLE kafka_test_logs (
>>>>>> action2 STRING
>>>>>> ) WITH (
>>>>>> 'connector' = 'kafka',
>>>>>> 'topic' = 'test_logs',
>>>>>> 'properties.bootstrap.servers' = 'kafka:9092',
>>>>>> 'properties.group.id' = 'flink_group',
>>>>>> 'scan.startup.mode' = 'earliest-offset',
>>>>>> 'format' = 'json',
>>>>>> 'json.ignore-parse-errors' = 'true'
>>>>>> )
>>>>>> """)
>>>>>> t_env.execute_sql("""
>>>>>> SELECT COUNT(*) AS message_count
>>>>>> FROM kafka_test_logs
>>>>>> """).print()
>>>>>> print('Table job has now completed.')
>>>>>> if __name__ == "__main__":
>>>>>> process_table()
>>>>>> error:
>>>>>> link_app | Caused by: org.apache.flink.table.api.ValidationException:
>>>>>> Could not find any factory for identifier 'kafka' that implements
>>>>>> 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
>>>>>> flink_app |
>>>>>> flink_app | Available factory identifiers are:
>>>>>> flink_app |
>>>>>> flink_app | blackhole
>>>>>> flink_app | datagen
>>>>>> flink_app | filesystem
>>>>>> flink_app | print
>>>>>> flink_app | python-input-format
>>>>>> The '/opt/flink/lib' has the JARs from both the image and the JARs i
>>>>>> added in the docker-compose.yml file.
>>>>>> Stream API:
>>>>>> def process_stream():
>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>> properties = {
>>>>>> "bootstrap.servers": 'kafka:9092',
>>>>>> "group.id": "test_group"
>>>>>> }
>>>>>> kafka_consumer = FlinkKafkaConsumer(
>>>>>> topics='test_logs',
>>>>>> deserialization_schema=SimpleStringSchema(),
>>>>>> properties=properties)
>>>>>> data_stream = env.add_source(kafka_consumer)
>>>>>> parsed_stream = data_stream.map(lambda x: json.loads(x),
>>>>>> output_type=Types.ROW([Types.STRING()]))
>>>>>> parsed_stream.print()
>>>>>> env.execute("Kafka DataStream Test”)
>>>>>> if __name__ == "__main__":
>>>>>> process_stream()
>>>>>> error:
>>>>>> TypeError: Could not found the Java class
>>>>>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The 
>>>>>> Java
>>>>>> dependencies could be specified via command line argument '--jarfile' or
>>>>>> the config option 'pipeline.jars'
>>>>>> I have used the JAR dependencies based on:
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis
>>>>>> Kind regards
>>>>>> Phil

