Hi Phil, You can check my github link <https://github.com/bgeng777/pyflink-learning/tree/main/pyflink-using-docker> for a detailed tutorial and example codes :).
Best, Biao Geng Phil Stavridis <phi...@gmail.com> 于2024年4月12日周五 19:10写道: > Hi Biao, > > Thanks for looking into it and providing a detailed example. > I am not sure I am following some of the setup and I just want to make > sure I run what you have tested without messing up anything with the setup. > Especially for how you create the Docker images etc. Do you have this > committed somewhere or could you perhaps provide a JAR file with the > implementation? Thanks. > > On 11 Apr 2024, at 19:43, Biao Geng <biaoge...@gmail.com> wrote: > > Hi Phil, > I test it on my local docker environment and find that we do need to use > "flink run" to submit the job to the session cluster. Simply using `python > xx.py` may just launch a local mini cluster and would not submit the job to > the cluster you created. Also note, that all required dependencies (e.g. > the kafka connector) need to be available in the cluster as well as the > client. > > Here is my test codes which work in my env, hope this helps: > my flink-cluster-and-client.yaml: > version: "2.2" > services: > jobmanager: > image: pyflink:latest > ports: > - "8081:8081" > command: jobmanager > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > > taskmanager: > image: pyflink:latest > depends_on: > - jobmanager > command: taskmanager > scale: 1 > environment: > - | > FLINK_PROPERTIES= > jobmanager.rpc.address: jobmanager > taskmanager.numberOfTaskSlots: 2 > > client: > image: pyflink-client:latest > depends_on: > - jobmanager > command: /opt/flink/run.sh > environment: > - | > FLINK_PROPERTIES= > rest.address: jobmanager > rest.bind-address: jobmanager > scale: 1 > > my read_kafka.py: > from pyflink.table import (EnvironmentSettings, TableEnvironment) > > def process_table(): > env_settings = ( > EnvironmentSettings > .new_instance() > .in_streaming_mode() > .build() > ) > t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) > > > t_env.execute_sql( > f""" > CREATE TABLE kafka_input_topic ( > action STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'input_topic', > 'properties.bootstrap.servers' = 'kafka:9092', > 'properties.group.id' = 'test', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json', > 'json.ignore-parse-errors' = 'true' > ) > """) > t_env.execute_sql( > f""" > CREATE TABLE print_sink ( > action STRING > ) WITH ( > 'connector' = 'print' > ) > """) > > t_env.execute_sql(""" > INSERT INTO print_sink SELECT * FROM kafka_input_topic > """).print() > > if __name__ == "__main__": > process_table() > > > the run.sh is just one line to use `flink run`: > #!/bin/bash > flink run -py read_kafka.py > > my dockerfile to build pyflink:lastest image: > > FROM flink:latest > > # install python3 and pip3 > RUN apt-get update -y && \ > apt-get install -y python3 python3-pip python3-dev && rm -rf > /var/lib/apt/lists/* > RUN ln -s /usr/bin/python3 /usr/bin/python > > # install PyFlink > > COPY apache-flink*.tar.gz / > RUN pip3 install /apache-flink-libraries*.tar.gz && pip3 install > /apache-flink*.tar.gz > COPY flink-sql-connector-kafka-3.1.0-1.18.jar /opt/flink/lib > > my dockerfile to build pyflink-client:lastest image: > FROM pyflink:latest > COPY read_kafka.py /opt/flink/ > COPY run.sh /opt/flink > RUN chmod 777 /opt/flink/run.sh > > Possible output (dependent on the content of the kafka topic): > <image.png> > > Best, > Biao Geng > > Biao Geng <biaoge...@gmail.com> 于2024年4月11日周四 09:53写道: > >> Hi Phil, >> "should this be run in the Flink JobManager?” It should be fine to run in >> your client container. >> "examples of running flink run in a Dockerized application" This sql >> client example >> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#flink-sql-client-with-session-cluster> >> can >> be used as a good example. >> It is somehow strange to learn that the pipeline.jars does not work. I >> will try to run the pyflink using kafka example in docker locally later >> today. Besides that, I see that in your all 3 containers, the connector jar >> are all located in /opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar, >> could you please give a try of using `table_env.get_config().set( >> "pipeline.classpaths", "file:///my/jar/path/connector.jar")` in your >> python script? >> >> Best, >> Biao Geng >> >> >> Phil Stavridis <phi...@gmail.com> 于2024年4月11日周四 00:49写道: >> >>> Hi Biao, >>> >>> I will check out running with flink run, but should this be run in the >>> Flink JobManager? Would that mean that the container for the Flink >>> JobManager would require both Python installed and a copy of the >>> flink_client.py module? Are there some examples of running flink run in a >>> Dockerized application instead of the local CLI? >>> I have added the pipeline.jars in the flink_client module but it is >>> still complaining about not finding kafka and the DynamicTable factory >>> class in the class path. >>> Thanks for your feedback. >>> >>> Kind regards >>> Phil >>> >>> On 10 Apr 2024, at 16:21, Biao Geng <biaoge...@gmail.com> wrote: >>> >>> Hi Phil, >>> It should be totally ok to use `python -m flink_client.job`. It just >>> seems to me that the flink cli is being used more often. >>> And yes, you also need to add the sql connector jar to the flink_client >>> container. After putting the jar in your client container, add codes >>> like `table_env.get_config().set("pipeline.jars", >>> "file:///path/in/container/to/connector.jar")` to your job.py. Then in the >>> client side, the flink can see the connector jar and the jar would be >>> uploaded to the cluster as well. See the doc of Jar Dependencies >>> Management >>> <https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/dependency_management/> >>> for >>> more details. >>> >>> Best, >>> Biao Geng >>> >>> >>> Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 22:04写道: >>> >>>> Hi Biao, >>>> >>>> 1. I have a Flink client container like this: >>>> # Flink client >>>> flink_client: >>>> container_name: flink_client >>>> image: flink-client:local >>>> build: >>>> context: . >>>> dockerfile: flink_client/Dockerfile >>>> networks: >>>> - standard >>>> depends_on: >>>> - jobmanager >>>> - Kafka >>>> >>>> The flink_client/Dockerfile has this bash file which only runs the >>>> Flink client, when the Flink cluster is up an running: >>>> Dockerfile: >>>> ENTRYPOINT [“./run_when_ready.sh”] >>>> >>>> run_when_ready.sh: >>>> >>>> #!/bin/bash >>>> >>>> wait_for_jobmanager() { >>>> echo "Waiting for Flink JobManager to be ready..." >>>> while ! nc -z jobmanager 8081; do >>>> sleep 1 >>>> done >>>> echo "Flink JobManager is ready." >>>> } >>>> >>>> # Call the function >>>> wait_for_jobmanager >>>> >>>> # Now run the PyFlink job >>>> python -m flink_client.job >>>> flink_client.job.py is the file I attached earlier which has: >>>> >>>> if __name__ == "__main__": >>>>>> process_table() >>>>>> >>>>> >>>> So it runs that file which should submit the Table job to the Flink >>>> cluster as that file runs: >>>> >>>> t_env.execute_sql(""" >>>>>> SELECT COUNT(*) AS message_count >>>>>> FROM kafka_test_logs >>>>>> """).print() >>>>>> >>>>> >>>> I don’t use flink run -t … >>>> Is this the wrong way to run PyFlink jobs? >>>> >>>> Also, do I need to also add the flink-sql-connector.jar to the >>>> flink_client/Dockerfile, i.e. COPY /libs/*.jar /opt/flink/lib/ and instead >>>> of relying on the environment.execute_sql(…), I need to run it with >>>> flink_run -t and the JAR file as you mentioned earlier? >>>> >>>> >>>> On 10 Apr 2024, at 11:37, Biao Geng <biaoge...@gmail.com> wrote: >>>> >>>> Hi Phil, >>>> >>>> Your codes look good. I mean how do you run the python script. Maybe >>>> you are using flink cli? i.e. run commands like ` flink run -t .. -py >>>> job.py -j /path/to/flink-sql-kafka-connector.jar`. If that's the case, the >>>> `-j /path/to/flink-sql-kafka-connector.jar` is necessary so that in client >>>> side, flink can generate the job graph successfully. >>>> >>>> As for the second question, in your case, yes, your client’s env need >>>> to have the flink-sql-kafka-connector JAR. You can check the doc >>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#deployment-modes> >>>> for >>>> more details. In short, your yaml shows that you are using the session >>>> mode, which needs connector jar to generate job graph in the client side. >>>> >>>> >>>> Best, >>>> Biao Geng >>>> >>>> Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 18:14写道: >>>> >>>>> Hi Biao, >>>>> >>>>> For submitting the job, I run t_env.execute_sql. >>>>> Shouldn’t that be sufficient for submitting the job using the Table >>>>> API with PyFlink? Isn’t that the recommended way for submitting and >>>>> running >>>>> PyFlink jobs on a running Flink cluster? The Flink cluster runs without >>>>> issues, but there is no Running Job when the container flink_app, that has >>>>> the client code, runs. >>>>> I could submit a job, collect, using the Flink SQL console, and it was >>>>> available on the Running Jobs tab, but that was also not ingesting data >>>>> from the Kafka topic. >>>>> Does my client’s env need to have the flink-sql-kafka-connector JAR? >>>>> Is that needed just in the class path of the Flink JobManager and >>>>> TaskManager? >>>>> >>>>> Kind regards >>>>> Phil >>>>> >>>>> >>>>> On 10 Apr 2024, at 03:33, Biao Geng <biaoge...@gmail.com> wrote: >>>>> >>>>> Hi Phil, >>>>> >>>>> Thanks for sharing the detailed information of the job. For you >>>>> question, how to you submit the job? After applying your yaml file, I >>>>> think >>>>> you will successfully launch a flink cluster with 1 JM and 1 TM. Then you >>>>> would submit the pyflink job to the flink cluster. As the error you showed >>>>> is the client-side error, it is possible that your client's env does not >>>>> contain the flink-sql-kafka-connector jar which may lead to the exception. >>>>> By the way, the "Table API" codes in your mail is actually using the >>>>> flink SQL API, so the flink-sql-kafka-connector jar is required, which is >>>>> exactly what you have prepared. For pyflink's table API, you can have a >>>>> look at this document: >>>>> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html >>>>> >>>>> Best, >>>>> Biao Geng >>>>> >>>>> >>>>> Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 03:10写道: >>>>> >>>>>> Hello, >>>>>> >>>>>> I have set up Flink and Kafka containers using docker-compose, for >>>>>> testing how Flink works for processing Kafka messages. >>>>>> I primarily want to check how the Table API works but also how the >>>>>> Stream API would process the Kafka messages. >>>>>> >>>>>> I have included the main part of the docker-compose.yaml file I am >>>>>> using for the Flink cluster. I have commented out some of the JAR files I >>>>>> have tried out but I have mainly used the flink-sql-connector-kafka JAR. >>>>>> I >>>>>> would like to confirm if I am using any wrong configurations, which JARs >>>>>> should I be using for each API and if just need to use one JAR for both >>>>>> Table and Datastream APIs? >>>>>> I have also included the Flink client module I have used for both the >>>>>> Table and the Datastream APIs and the error messages. >>>>>> Any idea what is missing or if there is any configuration that seems >>>>>> wrong? >>>>>> >>>>>> docker-compose.yml >>>>>> flink_jobmanager: >>>>>> image: flink:1.18.1-scala_2.12 >>>>>> container_name: flink_jobmanager >>>>>> ports: >>>>>> - "8081:8081" >>>>>> command: jobmanager >>>>>> volumes: >>>>>> - >>>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar >>>>>> # - >>>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar >>>>>> # - >>>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar >>>>>> # - >>>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar >>>>>> environment: >>>>>> - | >>>>>> FLINK_PROPERTIES= >>>>>> jobmanager.rpc.address: jobmanager >>>>>> networks: >>>>>> - standard >>>>>> depends_on: >>>>>> - kafka >>>>>> >>>>>> flink_taskmanager: >>>>>> image: flink:1.18.1-scala_2.12 >>>>>> container_name: flink_taskmanager >>>>>> volumes: >>>>>> - >>>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar >>>>>> # - >>>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar >>>>>> # - >>>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar >>>>>> # - >>>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar >>>>>> depends_on: >>>>>> - kafka >>>>>> - jobmanager >>>>>> command: taskmanager >>>>>> environment: >>>>>> - | >>>>>> FLINK_PROPERTIES= >>>>>> jobmanager.rpc.address: jobmanager >>>>>> taskmanager.numberOfTaskSlots: 2 >>>>>> networks: >>>>>> - standard >>>>>> Table API: >>>>>> def process_table(): >>>>>> >>>>>> print('Running the Table job now.’) >>>>>> env_settings = ( >>>>>> EnvironmentSettings >>>>>> .new_instance() >>>>>> .in_streaming_mode() >>>>>> # .with_configuration(config) >>>>>> .build() >>>>>> ) >>>>>> t_env = TableEnvironment.create(env_settings) >>>>>> >>>>>> t_env.execute_sql( >>>>>> f""" >>>>>> CREATE TABLE kafka_test_logs ( >>>>>> action2 STRING >>>>>> ) WITH ( >>>>>> 'connector' = 'kafka', >>>>>> 'topic' = 'test_logs', >>>>>> 'properties.bootstrap.servers' = 'kafka:9092', >>>>>> 'properties.group.id' = 'flink_group', >>>>>> 'scan.startup.mode' = 'earliest-offset', >>>>>> 'format' = 'json', >>>>>> 'json.ignore-parse-errors' = 'true' >>>>>> ) >>>>>> """) >>>>>> >>>>>> >>>>>> t_env.execute_sql(""" >>>>>> SELECT COUNT(*) AS message_count >>>>>> FROM kafka_test_logs >>>>>> """).print() >>>>>> >>>>>> print('Table job has now completed.') >>>>>> >>>>>> >>>>>> if __name__ == "__main__": >>>>>> process_table() >>>>>> error: >>>>>> link_app | Caused by: org.apache.flink.table.api.ValidationException: >>>>>> Could not find any factory for identifier 'kafka' that implements >>>>>> 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. >>>>>> flink_app | >>>>>> flink_app | Available factory identifiers are: >>>>>> flink_app | >>>>>> flink_app | blackhole >>>>>> flink_app | datagen >>>>>> flink_app | filesystem >>>>>> flink_app | print >>>>>> flink_app | python-input-format >>>>>> The '/opt/flink/lib' has the JARs from both the image and the JARs i >>>>>> added in the docker-compose.yml file. >>>>>> >>>>>> Stream API: >>>>>> def process_stream(): >>>>>> env = StreamExecutionEnvironment.get_execution_environment() >>>>>> >>>>>> properties = { >>>>>> "bootstrap.servers": 'kafka:9092', >>>>>> "group.id": "test_group" >>>>>> } >>>>>> >>>>>> kafka_consumer = FlinkKafkaConsumer( >>>>>> topics='test_logs', >>>>>> deserialization_schema=SimpleStringSchema(), >>>>>> properties=properties) >>>>>> >>>>>> data_stream = env.add_source(kafka_consumer) >>>>>> >>>>>> parsed_stream = data_stream.map(lambda x: json.loads(x), >>>>>> output_type=Types.ROW([Types.STRING()])) >>>>>> parsed_stream.print() >>>>>> >>>>>> env.execute("Kafka DataStream Test”) >>>>>> >>>>>> if __name__ == "__main__": >>>>>> process_stream() >>>>>> error: >>>>>> TypeError: Could not found the Java class >>>>>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The >>>>>> Java >>>>>> dependencies could be specified via command line argument '--jarfile' or >>>>>> the config option 'pipeline.jars' >>>>>> >>>>>> I have used the JAR dependencies based on: >>>>>> >>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis >>>>>> >>>>>> Kind regards >>>>>> Phil >>>>>> >>>>> >>>>> >>>> >>> >