Hi Biao,

1. I have a Flink client container like this:
# Flink client
flink_client:
container_name: flink_client
image: flink-client:local
build:
context: .
dockerfile: flink_client/Dockerfile
networks:
- standard
depends_on:
- jobmanager
- Kafka

The flink_client/Dockerfile has this bash file which only runs the Flink 
client, when the Flink cluster is up an running:
Dockerfile:
ENTRYPOINT [“./run_when_ready.sh”]

run_when_ready.sh:

#!/bin/bash

wait_for_jobmanager() {
echo "Waiting for Flink JobManager to be ready..."
while ! nc -z jobmanager 8081; do
sleep 1
done
echo "Flink JobManager is ready."
}

# Call the function
wait_for_jobmanager

# Now run the PyFlink job
python -m flink_client.job
flink_client.job.py is the file I attached earlier which has:

>>>> if __name__ == "__main__":
>>>>    process_table()


So it runs that file which should submit the Table job to the Flink cluster as 
that file runs:
>>>> t_env.execute_sql("""
>>>> SELECT COUNT(*) AS message_count
>>>> FROM kafka_test_logs
>>>> """).print()


I don’t use flink run -t …
Is this the wrong way to run PyFlink jobs?

Also, do I need to also add the flink-sql-connector.jar to the 
flink_client/Dockerfile, i.e. COPY /libs/*.jar /opt/flink/lib/ and instead of 
relying on the environment.execute_sql(…), I need to run it with flink_run -t 
and the JAR file as you mentioned earlier?


> On 10 Apr 2024, at 11:37, Biao Geng <biaoge...@gmail.com> wrote:
> 
> Hi Phil,
> 
> Your codes look good. I mean how do you run the python script. Maybe you are 
> using flink cli? i.e. run commands like ` flink run -t .. -py job.py -j 
> /path/to/flink-sql-kafka-connector.jar`. If that's the case, the `-j 
> /path/to/flink-sql-kafka-connector.jar` is necessary so that in client side, 
> flink can generate the job graph successfully.
> 
> As for the second question, in your case, yes, your client’s env need to have 
> the flink-sql-kafka-connector JAR. You can check the doc 
> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#deployment-modes>
>  for more details. In short, your yaml shows that you are using the session 
> mode, which needs connector jar to generate job graph in the client side.
> 
> 
> Best,
> Biao Geng
> 
> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月10日周三 
> 18:14写道:
>> Hi Biao,
>> 
>> For submitting the job, I run t_env.execute_sql.
>> Shouldn’t that be sufficient for submitting the job using the Table API with 
>> PyFlink? Isn’t that the recommended way for submitting and running PyFlink 
>> jobs on a running Flink cluster? The Flink cluster runs without issues, but 
>> there is no Running Job when the container flink_app, that has the client 
>> code, runs. 
>> I could submit a job, collect, using the Flink SQL console, and it was 
>> available on the Running Jobs tab, but that was also not ingesting data from 
>> the Kafka topic.
>> Does my client’s env need to have the flink-sql-kafka-connector JAR? Is that 
>> needed just in the class path of the Flink JobManager and TaskManager?
>> 
>> Kind regards
>> Phil
>> 
>> 
>>> On 10 Apr 2024, at 03:33, Biao Geng <biaoge...@gmail.com 
>>> <mailto:biaoge...@gmail.com>> wrote:
>>> 
>>> Hi Phil,
>>> 
>>> Thanks for sharing the detailed information of the job. For you question, 
>>> how to you submit the job? After applying your yaml file, I think you will 
>>> successfully launch a flink cluster with 1 JM and 1 TM. Then you would 
>>> submit the pyflink job to the flink cluster. As the error you showed is the 
>>> client-side error, it is possible that your client's env does not contain 
>>> the flink-sql-kafka-connector jar which may lead to the exception.
>>> By the way, the "Table API" codes in your mail is actually using the flink 
>>> SQL API, so the flink-sql-kafka-connector jar is required, which is exactly 
>>> what you have prepared. For pyflink's table API, you can have a look at 
>>> this document: 
>>> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html
>>> 
>>> Best,
>>> Biao Geng
>>> 
>>> 
>>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月10日周三 
>>> 03:10写道:
>>>> Hello,
>>>> 
>>>> I have set up Flink and Kafka containers using docker-compose, for testing 
>>>> how Flink works for processing Kafka messages.
>>>> I primarily want to check how the Table API works but also how the Stream 
>>>> API would process the Kafka messages.
>>>> 
>>>> I have included the main part of the docker-compose.yaml file I am using 
>>>> for the Flink cluster. I have commented out some of the JAR files I have 
>>>> tried out but I have mainly used the flink-sql-connector-kafka JAR. I 
>>>> would like to confirm if I am using any wrong configurations, which JARs 
>>>> should I be using for each API and if just need to use one JAR for both 
>>>> Table and Datastream APIs?
>>>> I have also included the Flink client module I have used for both the 
>>>> Table and the Datastream APIs and the error messages.
>>>> Any idea what is missing or if there is any configuration that seems wrong?
>>>> 
>>>> docker-compose.yml
>>>> flink_jobmanager:
>>>> image: flink:1.18.1-scala_2.12
>>>> container_name: flink_jobmanager
>>>> ports:
>>>> - "8081:8081"
>>>> command: jobmanager
>>>> volumes:
>>>> - 
>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>>>> # - 
>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>>>> # - 
>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>>>> # - 
>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>>>> environment:
>>>> - |
>>>> FLINK_PROPERTIES=
>>>> jobmanager.rpc.address: jobmanager
>>>> networks:
>>>> - standard
>>>> depends_on:
>>>> - kafka
>>>> 
>>>> flink_taskmanager:
>>>> image: flink:1.18.1-scala_2.12
>>>> container_name: flink_taskmanager
>>>> volumes:
>>>> - 
>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>>>> # - 
>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>>>> # - 
>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>>>> # - 
>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>>>> depends_on:
>>>> - kafka
>>>> - jobmanager
>>>> command: taskmanager
>>>> environment:
>>>> - |
>>>> FLINK_PROPERTIES=
>>>> jobmanager.rpc.address: jobmanager
>>>> taskmanager.numberOfTaskSlots: 2
>>>> networks:
>>>> - standard
>>>> Table API:
>>>> def process_table():
>>>> 
>>>> print('Running the Table job now.’)        
>>>> env_settings = (
>>>> EnvironmentSettings
>>>> .new_instance()
>>>> .in_streaming_mode()
>>>> # .with_configuration(config)
>>>> .build()
>>>> )
>>>> t_env = TableEnvironment.create(env_settings)
>>>> 
>>>> t_env.execute_sql(
>>>> f"""
>>>> CREATE TABLE kafka_test_logs (
>>>> action2 STRING
>>>> ) WITH (
>>>> 'connector' = 'kafka',
>>>> 'topic' = 'test_logs',
>>>> 'properties.bootstrap.servers' = 'kafka:9092',
>>>> 'properties.group.id <http://properties.group.id/>' = 'flink_group',
>>>> 'scan.startup.mode' = 'earliest-offset',
>>>> 'format' = 'json',
>>>> 'json.ignore-parse-errors' = 'true'
>>>> )
>>>> """)
>>>> 
>>>> 
>>>> t_env.execute_sql("""
>>>> SELECT COUNT(*) AS message_count
>>>> FROM kafka_test_logs
>>>> """).print()
>>>> 
>>>> print('Table job has now completed.')
>>>> 
>>>> 
>>>> if __name__ == "__main__":
>>>>    process_table()
>>>> error:
>>>> link_app | Caused by: org.apache.flink.table.api.ValidationException: 
>>>> Could not find any factory for identifier 'kafka' that implements 
>>>> 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
>>>> flink_app |
>>>> flink_app | Available factory identifiers are:
>>>> flink_app |
>>>> flink_app | blackhole
>>>> flink_app | datagen
>>>> flink_app | filesystem
>>>> flink_app | print
>>>> flink_app | python-input-format
>>>> The '/opt/flink/lib' has the JARs from both the image and the JARs i added 
>>>> in the docker-compose.yml file.
>>>> 
>>>> Stream API:
>>>> def process_stream():
>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>> 
>>>> properties = {
>>>> "bootstrap.servers": 'kafka:9092',
>>>> "group.id <http://group.id/>": "test_group"
>>>> }
>>>> 
>>>> kafka_consumer = FlinkKafkaConsumer(
>>>> topics='test_logs',
>>>> deserialization_schema=SimpleStringSchema(),
>>>> properties=properties)
>>>> 
>>>> data_stream = env.add_source(kafka_consumer)
>>>> 
>>>> parsed_stream = data_stream.map(lambda x: json.loads(x), 
>>>> output_type=Types.ROW([Types.STRING()]))
>>>> parsed_stream.print()
>>>> 
>>>> env.execute("Kafka DataStream Test”)
>>>> 
>>>> if __name__ == "__main__":
>>>>    process_stream()
>>>> error:
>>>> TypeError: Could not found the Java class 
>>>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java 
>>>> dependencies could be specified via command line argument '--jarfile' or 
>>>> the config option 'pipeline.jars'
>>>> 
>>>> I have used the JAR dependencies based on:
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis
>>>> 
>>>> Kind regards
>>>> Phil
>> 

Reply via email to