Hi Biao,

For submitting the job, I run t_env.execute_sql.
Shouldn’t that be sufficient for submitting the job using the Table API with 
PyFlink? Isn’t that the recommended way for submitting and running PyFlink jobs 
on a running Flink cluster? The Flink cluster runs without issues, but there is 
no Running Job when the container flink_app, that has the client code, runs. 
I could submit a job, collect, using the Flink SQL console, and it was 
available on the Running Jobs tab, but that was also not ingesting data from 
the Kafka topic.
Does my client’s env need to have the flink-sql-kafka-connector JAR? Is that 
needed just in the class path of the Flink JobManager and TaskManager?

Kind regards
Phil


> On 10 Apr 2024, at 03:33, Biao Geng <biaoge...@gmail.com> wrote:
> 
> Hi Phil,
> 
> Thanks for sharing the detailed information of the job. For you question, how 
> to you submit the job? After applying your yaml file, I think you will 
> successfully launch a flink cluster with 1 JM and 1 TM. Then you would submit 
> the pyflink job to the flink cluster. As the error you showed is the 
> client-side error, it is possible that your client's env does not contain the 
> flink-sql-kafka-connector jar which may lead to the exception.
> By the way, the "Table API" codes in your mail is actually using the flink 
> SQL API, so the flink-sql-kafka-connector jar is required, which is exactly 
> what you have prepared. For pyflink's table API, you can have a look at this 
> document: 
> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html
> 
> Best,
> Biao Geng
> 
> 
> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月10日周三 
> 03:10写道:
>> Hello,
>> 
>> I have set up Flink and Kafka containers using docker-compose, for testing 
>> how Flink works for processing Kafka messages.
>> I primarily want to check how the Table API works but also how the Stream 
>> API would process the Kafka messages.
>> 
>> I have included the main part of the docker-compose.yaml file I am using for 
>> the Flink cluster. I have commented out some of the JAR files I have tried 
>> out but I have mainly used the flink-sql-connector-kafka JAR. I would like 
>> to confirm if I am using any wrong configurations, which JARs should I be 
>> using for each API and if just need to use one JAR for both Table and 
>> Datastream APIs?
>> I have also included the Flink client module I have used for both the Table 
>> and the Datastream APIs and the error messages.
>> Any idea what is missing or if there is any configuration that seems wrong?
>> 
>> docker-compose.yml
>> flink_jobmanager:
>> image: flink:1.18.1-scala_2.12
>> container_name: flink_jobmanager
>> ports:
>> - "8081:8081"
>> command: jobmanager
>> volumes:
>> - 
>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>> # - 
>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>> # - 
>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>> # - 
>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>> environment:
>> - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> networks:
>> - standard
>> depends_on:
>> - kafka
>> 
>> flink_taskmanager:
>> image: flink:1.18.1-scala_2.12
>> container_name: flink_taskmanager
>> volumes:
>> - 
>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>> # - 
>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>> # - 
>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>> # - 
>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>> depends_on:
>> - kafka
>> - jobmanager
>> command: taskmanager
>> environment:
>> - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> taskmanager.numberOfTaskSlots: 2
>> networks:
>> - standard
>> Table API:
>> def process_table():
>> 
>> print('Running the Table job now.’)  
>> env_settings = (
>> EnvironmentSettings
>> .new_instance()
>> .in_streaming_mode()
>> # .with_configuration(config)
>> .build()
>> )
>> t_env = TableEnvironment.create(env_settings)
>> 
>> t_env.execute_sql(
>> f"""
>> CREATE TABLE kafka_test_logs (
>> action2 STRING
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'test_logs',
>> 'properties.bootstrap.servers' = 'kafka:9092',
>> 'properties.group.id <http://properties.group.id/>' = 'flink_group',
>> 'scan.startup.mode' = 'earliest-offset',
>> 'format' = 'json',
>> 'json.ignore-parse-errors' = 'true'
>> )
>> """)
>> 
>> 
>> t_env.execute_sql("""
>> SELECT COUNT(*) AS message_count
>> FROM kafka_test_logs
>> """).print()
>> 
>> print('Table job has now completed.')
>> 
>> 
>> if __name__ == "__main__":
>>      process_table()
>> error:
>> link_app | Caused by: org.apache.flink.table.api.ValidationException: Could 
>> not find any factory for identifier 'kafka' that implements 
>> 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
>> flink_app |
>> flink_app | Available factory identifiers are:
>> flink_app |
>> flink_app | blackhole
>> flink_app | datagen
>> flink_app | filesystem
>> flink_app | print
>> flink_app | python-input-format
>> The '/opt/flink/lib' has the JARs from both the image and the JARs i added 
>> in the docker-compose.yml file.
>> 
>> Stream API:
>> def process_stream():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> 
>> properties = {
>> "bootstrap.servers": 'kafka:9092',
>> "group.id <http://group.id/>": "test_group"
>> }
>> 
>> kafka_consumer = FlinkKafkaConsumer(
>> topics='test_logs',
>> deserialization_schema=SimpleStringSchema(),
>> properties=properties)
>> 
>> data_stream = env.add_source(kafka_consumer)
>> 
>> parsed_stream = data_stream.map(lambda x: json.loads(x), 
>> output_type=Types.ROW([Types.STRING()]))
>> parsed_stream.print()
>> 
>> env.execute("Kafka DataStream Test”)
>> 
>> if __name__ == "__main__":
>>      process_stream()
>> error:
>> TypeError: Could not found the Java class 
>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java 
>> dependencies could be specified via command line argument '--jarfile' or the 
>> config option 'pipeline.jars'
>> 
>> I have used the JAR dependencies based on:
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis
>> 
>> Kind regards
>> Phil

Reply via email to