Hi Phil,

Thanks for sharing the detailed information of the job. For you question,
how to you submit the job? After applying your yaml file, I think you will
successfully launch a flink cluster with 1 JM and 1 TM. Then you would
submit the pyflink job to the flink cluster. As the error you showed is the
client-side error, it is possible that your client's env does not contain
the flink-sql-kafka-connector jar which may lead to the exception.
By the way, the "Table API" codes in your mail is actually using the flink
SQL API, so the flink-sql-kafka-connector jar is required, which is exactly
what you have prepared. For pyflink's table API, you can have a look at
this document:
https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html

Best,
Biao Geng


Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 03:10写道:

> Hello,
>
> I have set up Flink and Kafka containers using docker-compose, for testing
> how Flink works for processing Kafka messages.
> I primarily want to check how the Table API works but also how the Stream
> API would process the Kafka messages.
>
> I have included the main part of the docker-compose.yaml file I am using
> for the Flink cluster. I have commented out some of the JAR files I have
> tried out but I have mainly used the flink-sql-connector-kafka JAR. I would
> like to confirm if I am using any wrong configurations, which JARs should I
> be using for each API and if just need to use one JAR for both Table and
> Datastream APIs?
> I have also included the Flink client module I have used for both the
> Table and the Datastream APIs and the error messages.
> Any idea what is missing or if there is any configuration that seems wrong?
>
> docker-compose.yml
> flink_jobmanager:
> image: flink:1.18.1-scala_2.12
> container_name: flink_jobmanager
> ports:
> - "8081:8081"
> command: jobmanager
> volumes:
> -
> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
> # -
> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
> # -
> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
> # -
> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> networks:
> - standard
> depends_on:
> - kafka
>
> flink_taskmanager:
> image: flink:1.18.1-scala_2.12
> container_name: flink_taskmanager
> volumes:
> -
> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
> # -
> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
> # -
> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
> # -
> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
> depends_on:
> - kafka
> - jobmanager
> command: taskmanager
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> taskmanager.numberOfTaskSlots: 2
> networks:
> - standard
> Table API:
> def process_table():
>
> print('Running the Table job now.’)
> env_settings = (
> EnvironmentSettings
> .new_instance()
> .in_streaming_mode()
> # .with_configuration(config)
> .build()
> )
> t_env = TableEnvironment.create(env_settings)
>
> t_env.execute_sql(
> f"""
> CREATE TABLE kafka_test_logs (
> action2 STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test_logs',
> 'properties.bootstrap.servers' = 'kafka:9092',
> 'properties.group.id' = 'flink_group',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'json',
> 'json.ignore-parse-errors' = 'true'
> )
> """)
>
>
> t_env.execute_sql("""
> SELECT COUNT(*) AS message_count
> FROM kafka_test_logs
> """).print()
>
> print('Table job has now completed.')
>
>
> if __name__ == "__main__":
> process_table()
> error:
> link_app | Caused by: org.apache.flink.table.api.ValidationException:
> Could not find any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
> flink_app |
> flink_app | Available factory identifiers are:
> flink_app |
> flink_app | blackhole
> flink_app | datagen
> flink_app | filesystem
> flink_app | print
> flink_app | python-input-format
> The '/opt/flink/lib' has the JARs from both the image and the JARs i added
> in the docker-compose.yml file.
>
> Stream API:
> def process_stream():
> env = StreamExecutionEnvironment.get_execution_environment()
>
> properties = {
> "bootstrap.servers": 'kafka:9092',
> "group.id": "test_group"
> }
>
> kafka_consumer = FlinkKafkaConsumer(
> topics='test_logs',
> deserialization_schema=SimpleStringSchema(),
> properties=properties)
>
> data_stream = env.add_source(kafka_consumer)
>
> parsed_stream = data_stream.map(lambda x: json.loads(x),
> output_type=Types.ROW([Types.STRING()]))
> parsed_stream.print()
>
> env.execute("Kafka DataStream Test”)
>
> if __name__ == "__main__":
> process_stream()
> error:
> TypeError: Could not found the Java class
> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java
> dependencies could be specified via command line argument '--jarfile' or
> the config option 'pipeline.jars'
>
> I have used the JAR dependencies based on:
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis
>
> Kind regards
> Phil
>

Reply via email to