Hi Harshit,

You should use
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-rabbitmq/1.15.0/flink-sql-connector-rabbitmq-1.15.0.jar
which is a fat jar containing all the dependencies.

Regards,
Dian

On Mon, May 9, 2022 at 10:05 PM harshit.varsh...@iktara.ai <
harshit.varsh...@iktara.ai> wrote:

>
>
>
>
> *From:* harshit.varsh...@iktara.ai [mailto:harshit.varsh...@iktara.ai]
> *Sent:* Monday, May 9, 2022 7:33 PM
> *To:* 'user@flink.apache.org'
> *Cc:* 'harshit.varsh...@iktara.ai'
> *Subject:* Rabbitmq Connection error with Flink version(1.15.0)
>
>
>
> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.15.0 & using reference code from
> pyflink reference code.
>
>
>
> I am getting following error .
>
> Exception in thread "Thread-4" java.lang.NoClassDefFoundError:
> com/rabbitmq/client/ConnectionFactory
>
> Caused by: java.lang.ClassNotFoundException:
> com.rabbitmq.client.ConnectionFactory
>
> ERROR:root:Exception while sending command.
>
> Traceback (most recent call last):
>
>   File
> "C:\Users\Admin\PycharmProjects\pythonProject15\venv\lib\site-packages\py4j\java_gateway.py",
> line 1159, in send_command
>
>     raise Py4JNetworkError("Answer from Java side is empty")
>
> py4j.protocol.Py4JNetworkError: Answer from Java side is empty
>
> py4j.protocol.Py4JError:
> org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Builder
> does not exist in the JVM
>
>
>
> Below is my code for reference.
>
>
>
> from pyflink.datastream.connectors import FlinkKafkaProducer,
> FlinkKafkaConsumer, RMQConnectionConfig, RMQSource
>
> import logging
>
> import os
>
> import sys
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink.common import SimpleStringSchema
>
> def main():
>
>     env = StreamExecutionEnvironment.get_execution_environment()
>
>     # checkpointing is required for exactly-once or at-least-once
> guarantees
>
>
>
>     env.enable_checkpointing(100)
>
>     rabbitmq_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>                                 'flink-connector-rabbitmq-1.15.0.jar')
>
>     env.add_jars("file:///{}".format(rabbitmq_jar))
>
>
>
>     connection_config = RMQConnectionConfig.Builder() \
>
>         .set_host("localhost") \
>
>         .set_port(5672) \
>
>         .build()
>
>
>
>
>
>     stream = env \
>
>         .add_source(RMQSource(
>
>         connection_config,
>
>         'hello',
>
>         True,
>
>         SimpleStringSchema(),
>
>     )) \
>
>         .set_parallelism(1)
>
>
>
>     stream.print()
>
>     env.execute(‘main’)
>
>
>
>
>
> if __name__ == '__main__':
>
>     main()
>
>
>
>
>
> Thanks,
>
> Harshit
>
>
>
>
>

Reply via email to