Hi Flink Team,

I am currently reading streaming data from RabbitMQ and using the
RMQConnectionConfig for establishing the connection. Here's how I'm setting
up the connection:
and we use flink version 1.16.2 and RabbitMQ version 3.10.7

 RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
    .setPrefetchCount(smsInput.prefetchCount)
    .setHost(smsInput.HostServer)
    .setPort(smsInput.HostPort)
    .setUserName(smsInput.HostUserName)
    .setPassword(smsInput.HostPassword)
    .setVirtualHost("/")
    .build();

ConnectionFactory rabbitMQConnectionFactory =
connectionConfig.getConnectionFactory();
rabbitMQConnectionFactory.setRequestedChannelMax(smsInput.prefetchCount);
// Set prefetchcount

DataStream<String> stream = executionEnvironment.addSource(new
RMQSource<String>(connectionConfig,
                         smsInput.QueueName,
                         new SimpleStringSchema()))
                         .setParallelism(1);


Additionally, I have configured the prefetch count to read 3 data at the
same time from RabbitMQ. Here's how I have enabled the checkpointing
interval.

executionEnvironment.enableCheckpointing(smsInput.checkpointIntervalMS,CheckpointingMode.EXACTLY_ONCE,true);

The prefetch count seems to be working fine, but when I run the job with a
parallelism of 3, the prefetchCount is not working as expected.

We establish a connection to RabbitMQ with a fixed setParallelism of 1.
However, my other operators retrieve data from RabbitMQ and execute the job
with a parallelism of 3, as shown in the following command.

bin/flink run -p 3 ../apps/Flink_1.16.2_prefetch.jar
../config/app-config.properties -yD
env.java.home=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.el7_9.x86_64

So kindly provide a solution for configuring the prefetch count with
parallelism.



Thanks,
Ajay Pandey

Reply via email to