From: harshit.varsh...@iktara.ai [mailto:harshit.varsh...@iktara.ai]
Sent: Monday, May 9, 2022 7:33 PM
To: 'user@flink.apache.org'
Cc: 'harshit.varsh...@iktara.ai'
Subject: Rabbitmq Connection error with Flink version(1.15.0)
Dear Team,
I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.15.0 & using reference code from
pyflink reference code.
I am getting following error .
Exception in thread "Thread-4" java.lang.NoClassDefFoundError:
com/rabbitmq/client/ConnectionFactory
Caused by: java.lang.ClassNotFoundException:
com.rabbitmq.client.ConnectionFactory
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File
"C:\Users\Admin\PycharmProjects\pythonProject15\venv\lib\site-packages\py4j\
java_gateway.py", line 1159, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
py4j.protocol.Py4JError:
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Bu
ilder does not exist in the JVM
Below is my code for reference.
from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer, RMQConnectionConfig, RMQSource
import logging
import os
import sys
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import SimpleStringSchema
def main():
env = StreamExecutionEnvironment.get_execution_environment()
# checkpointing is required for exactly-once or at-least-once guarantees
env.enable_checkpointing(100)
rabbitmq_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'flink-connector-rabbitmq-1.15.0.jar')
env.add_jars("file:///{} <file:///\\%7b%7d> ".format(rabbitmq_jar))
connection_config = RMQConnectionConfig.Builder() \
.set_host("localhost") \
.set_port(5672) \
.build()
stream = env \
.add_source(RMQSource(
connection_config,
'hello',
True,
SimpleStringSchema(),
)) \
.set_parallelism(1)
stream.print()
env.execute('main')
if __name__ == '__main__':
main()
Thanks,
Harshit