Hi Phil, >>> Is it a library missing from the Flink cluster? What would cause that error? In most cases, it implies the collision of jar dependencies. flink-sql-connector-kafka jar should be enough to run the job and you may need to check your classpath in the jobmanager to see if any other jars that contain kafka are included. Another hacking method is to set config 'classloader.resolve-order' to 'parent-first' in your flink-conf.yaml and restart your flink cluster.
>>>> java.lang.NoSuchFieldError: SPOT_INSTANCE_INTERRUPTION_NOTICE_DURATION The method name SPOT_INSTANCE_INTERRUPTION_NOTICE_DURATION does not exist in flink repo. Maybe it is better to contact the AWS expert to find the cause of this problem. Best, Biao Geng Phil Stavridis <phi...@gmail.com> 于2024年5月8日周三 17:51写道: > I am running a Flink job locally using > > python -m job.py > > and it runs fine. > > The job is: > > calcul_count = t_env.execute_sql(""" > SELECT > username, > COUNT(action) AS a_count > FROM kafka_logs > GROUP BY username > """) > > with calcul_count.collect() as results: > for row in results: > print(row) > > > When i try to submit it to the Flink JobManager running as a Docker > service with: > > docker-compose exec -e KAFKA_BOOTSTRAP_SERVERS="kafka:9092" jobmanager > flink run -py /opt/app/job.py > > > I am getting error: > > for row in results: > File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_result.py", > line 240, in __next__ > File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > line 1322, in __call__ > File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line > 146, in deco > File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line > 326, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o176.hasNext. > : java.lang.RuntimeException: Failed to fetch next result > > Caused by: java.io.IOException: Failed to fetch job execution result > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.client.program.ProgramInvocationException: Job failed > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > Caused by: org.apache.kafka.common.KafkaException: > org.apache.kafka.common.KafkaException: class > org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer > is not an instance of org.apache.kafka.common.serialization.Deserializer > > > This is my flink cluster Dockerfile: > > FROM flink:1.18.1-scala_2.12 > > RUN apt-get update && \ > apt-get install -y python3 python3-pip && \ > ln -s /usr/bin/python3 /usr/bin/python && \ > apt-get clean && \ > rm -rf /var/lib/apt/lists/* > > RUN pip install apache-flink==1.18.1 > > # Download the necessary Flink connector jars > ADD > https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.18/flink-sql-connector-kafka-3.1.0-1.18.jar > /opt/flink/lib/ > ADD > https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.18.1/flink-json-1.18.1.jar > /opt/flink/lib/ > > WORKDIR /opt/flink > Is it a library missing from the Flink cluster? What would cause that > error? > > > I have also tried to run the job.py by submitting it on a Flink cluster on > EMR which has the same version of Flink as my local Flink cluster which > runs as a Docker container with: > > /usr/lib/flink/bin/flink run -py /opt/app/job.py > after copying the job on the Flink JobManager on EMR but i am getting > error: > > calcul_count = t_env.execute_sql(""" > File > "/usr/lib/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 837, in execute_sql > File > "/usr/lib/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > line 1322, in __call__ > File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 146, in deco > File "/usr/lib/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", > line 326, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling > o16.executeSql. > : java.lang.NoSuchFieldError: SPOT_INSTANCE_INTERRUPTION_NOTICE_DURATION > at > org.apache.flink.streaming.api.environment.CheckpointConfig.setSpotInstanceInterruptionNoticeDuration(CheckpointConfig.java:891) > at > org.apache.flink.streaming.api.graph.StreamGraph.setSpotInstanceInterruptionNoticeDuration(StreamGraph.java:182) >