Hi Harshit, You should use https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-rabbitmq/1.15.0/flink-sql-connector-rabbitmq-1.15.0.jar which is a fat jar containing all the dependencies.
Regards, Dian On Mon, May 9, 2022 at 10:05 PM harshit.varsh...@iktara.ai < harshit.varsh...@iktara.ai> wrote: > > > > > *From:* harshit.varsh...@iktara.ai [mailto:harshit.varsh...@iktara.ai] > *Sent:* Monday, May 9, 2022 7:33 PM > *To:* 'user@flink.apache.org' > *Cc:* 'harshit.varsh...@iktara.ai' > *Subject:* Rabbitmq Connection error with Flink version(1.15.0) > > > > Dear Team, > > > > I am new to pyflink and request for your support in issue I am facing with > Pyflink. I am using Pyflink version 1.15.0 & using reference code from > pyflink reference code. > > > > I am getting following error . > > Exception in thread "Thread-4" java.lang.NoClassDefFoundError: > com/rabbitmq/client/ConnectionFactory > > Caused by: java.lang.ClassNotFoundException: > com.rabbitmq.client.ConnectionFactory > > ERROR:root:Exception while sending command. > > Traceback (most recent call last): > > File > "C:\Users\Admin\PycharmProjects\pythonProject15\venv\lib\site-packages\py4j\java_gateway.py", > line 1159, in send_command > > raise Py4JNetworkError("Answer from Java side is empty") > > py4j.protocol.Py4JNetworkError: Answer from Java side is empty > > py4j.protocol.Py4JError: > org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Builder > does not exist in the JVM > > > > Below is my code for reference. > > > > from pyflink.datastream.connectors import FlinkKafkaProducer, > FlinkKafkaConsumer, RMQConnectionConfig, RMQSource > > import logging > > import os > > import sys > > from pyflink.datastream import StreamExecutionEnvironment > > from pyflink.common import SimpleStringSchema > > def main(): > > env = StreamExecutionEnvironment.get_execution_environment() > > # checkpointing is required for exactly-once or at-least-once > guarantees > > > > env.enable_checkpointing(100) > > rabbitmq_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), > > 'flink-connector-rabbitmq-1.15.0.jar') > > env.add_jars("file:///{}".format(rabbitmq_jar)) > > > > connection_config = RMQConnectionConfig.Builder() \ > > .set_host("localhost") \ > > .set_port(5672) \ > > .build() > > > > > > stream = env \ > > .add_source(RMQSource( > > connection_config, > > 'hello', > > True, > > SimpleStringSchema(), > > )) \ > > .set_parallelism(1) > > > > stream.print() > > env.execute(‘main’) > > > > > > if __name__ == '__main__': > > main() > > > > > > Thanks, > > Harshit > > > > >