I am running a Flink job locally using

python -m job.py 

and it runs fine. 

The job is:

calcul_count = t_env.execute_sql("""
SELECT
username,
COUNT(action) AS a_count
FROM kafka_logs
GROUP BY username
""")

with calcul_count.collect() as results:
for row in results:
print(row)


When i try to submit it to the Flink JobManager running as a Docker service 
with:

docker-compose exec -e KAFKA_BOOTSTRAP_SERVERS="kafka:9092" jobmanager flink 
run -py /opt/app/job.py


I am getting error:

for row in results:
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table_result.py", line 
240, in __next__
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 
1322, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, 
in deco
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, 
in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o176.hasNext.
: java.lang.RuntimeException: Failed to fetch next result

Caused by: java.io.IOException: Failed to fetch job execution result
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed
Caused by: org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.KafkaException: class 
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
 is not an instance of org.apache.kafka.common.serialization.Deserializer


This is my flink cluster Dockerfile:

FROM flink:1.18.1-scala_2.12

RUN apt-get update && \
apt-get install -y python3 python3-pip && \
ln -s /usr/bin/python3 /usr/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

RUN pip install apache-flink==1.18.1

# Download the necessary Flink connector jars
ADD 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.18/flink-sql-connector-kafka-3.1.0-1.18.jar
 /opt/flink/lib/
ADD 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.18.1/flink-json-1.18.1.jar
 /opt/flink/lib/

WORKDIR /opt/flink
Is it a library missing from the Flink cluster? What would cause that error?


I have also tried to run the job.py by submitting it on a Flink cluster on EMR 
which has the same version of Flink as my local Flink cluster which runs as a 
Docker container with:

/usr/lib/flink/bin/flink run -py /opt/app/job.py
after copying the job on the Flink JobManager on EMR but i am getting error:

calcul_count = t_env.execute_sql("""
File 
"/usr/lib/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", 
line 837, in execute_sql
File "/usr/lib/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", 
line 1322, in __call__
File "/usr/lib/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 
146, in deco
File "/usr/lib/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o16.executeSql.
: java.lang.NoSuchFieldError: SPOT_INSTANCE_INTERRUPTION_NOTICE_DURATION
at 
org.apache.flink.streaming.api.environment.CheckpointConfig.setSpotInstanceInterruptionNoticeDuration(CheckpointConfig.java:891)
at 
org.apache.flink.streaming.api.graph.StreamGraph.setSpotInstanceInterruptionNoticeDuration(StreamGraph.java:182)

Reply via email to