From: harshit.varsh...@iktara.ai [mailto:harshit.varsh...@iktara.ai] 
Sent: Monday, May 9, 2022 7:33 PM
To: 'user@flink.apache.org'
Cc: 'harshit.varsh...@iktara.ai'
Subject: Rabbitmq Connection error with Flink version(1.15.0)

 

Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.15.0 & using reference code from
pyflink reference code.

 

I am getting following error . 

Exception in thread "Thread-4" java.lang.NoClassDefFoundError:
com/rabbitmq/client/ConnectionFactory

Caused by: java.lang.ClassNotFoundException:
com.rabbitmq.client.ConnectionFactory

ERROR:root:Exception while sending command.

Traceback (most recent call last):

  File
"C:\Users\Admin\PycharmProjects\pythonProject15\venv\lib\site-packages\py4j\
java_gateway.py", line 1159, in send_command

    raise Py4JNetworkError("Answer from Java side is empty")

py4j.protocol.Py4JNetworkError: Answer from Java side is empty

py4j.protocol.Py4JError:
org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Bu
ilder does not exist in the JVM

 

Below is my code for reference.

 

from pyflink.datastream.connectors import FlinkKafkaProducer,
FlinkKafkaConsumer, RMQConnectionConfig, RMQSource

import logging

import os

import sys

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.common import SimpleStringSchema

def main():

    env = StreamExecutionEnvironment.get_execution_environment()

    # checkpointing is required for exactly-once or at-least-once guarantees

 

    env.enable_checkpointing(100)

    rabbitmq_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),

                                'flink-connector-rabbitmq-1.15.0.jar')

    env.add_jars("file:///{} <file:///\\%7b%7d> ".format(rabbitmq_jar))

 

    connection_config = RMQConnectionConfig.Builder() \

        .set_host("localhost") \

        .set_port(5672) \

        .build()

 

 

    stream = env \

        .add_source(RMQSource(

        connection_config,

        'hello',

        True,

        SimpleStringSchema(),

    )) \

        .set_parallelism(1)

 

    stream.print()

    env.execute('main')

 

 

if __name__ == '__main__':

    main()

 

 

Thanks,

Harshit

 

 

Reply via email to