I am running a Flink job locally using python -m job.py
and it runs fine. The job is: calcul_count = t_env.execute_sql(""" SELECT username, COUNT(action) AS a_count FROM kafka_logs GROUP BY username """) with calcul_count.collect() as results: for row in results: print(row) When i try to submit it to the Flink JobManager running as a Docker service with: docker-compose exec -e KAFKA_BOOTSTRAP_SERVERS="kafka:9092" jobmanager flink run -py /opt/app/job.py I am getting error: for row in results: File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_result.py", line 240, in __next__ File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o176.hasNext. : java.lang.RuntimeException: Failed to fetch next result Caused by: java.io.IOException: Failed to fetch job execution result Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: class org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer This is my flink cluster Dockerfile: FROM flink:1.18.1-scala_2.12 RUN apt-get update && \ apt-get install -y python3 python3-pip && \ ln -s /usr/bin/python3 /usr/bin/python && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* RUN pip install apache-flink==1.18.1 # Download the necessary Flink connector jars ADD https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.18/flink-sql-connector-kafka-3.1.0-1.18.jar /opt/flink/lib/ ADD https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.18.1/flink-json-1.18.1.jar /opt/flink/lib/ WORKDIR /opt/flink Is it a library missing from the Flink cluster? What would cause that error? I have also tried to run the job.py by submitting it on a Flink cluster on EMR which has the same version of Flink as my local Flink cluster which runs as a Docker container with: /usr/lib/flink/bin/flink run -py /opt/app/job.py after copying the job on the Flink JobManager on EMR but i am getting error: calcul_count = t_env.execute_sql(""" File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 837, in execute_sql File "/usr/lib/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco File "/usr/lib/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o16.executeSql. : java.lang.NoSuchFieldError: SPOT_INSTANCE_INTERRUPTION_NOTICE_DURATION at org.apache.flink.streaming.api.environment.CheckpointConfig.setSpotInstanceInterruptionNoticeDuration(CheckpointConfig.java:891) at org.apache.flink.streaming.api.graph.StreamGraph.setSpotInstanceInterruptionNoticeDuration(StreamGraph.java:182)