Hi Flink Team,
I am currently reading streaming data from RabbitMQ and using the RMQConnectionConfig for establishing the connection. Here's how I'm setting up the connection: and we use flink version 1.16.2 and RabbitMQ version 3.10.7 RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setPrefetchCount(smsInput.prefetchCount) .setHost(smsInput.HostServer) .setPort(smsInput.HostPort) .setUserName(smsInput.HostUserName) .setPassword(smsInput.HostPassword) .setVirtualHost("/") .build(); ConnectionFactory rabbitMQConnectionFactory = connectionConfig.getConnectionFactory(); rabbitMQConnectionFactory.setRequestedChannelMax(smsInput.prefetchCount); // Set prefetchcount DataStream<String> stream = executionEnvironment.addSource(new RMQSource<String>(connectionConfig, smsInput.QueueName, new SimpleStringSchema())) .setParallelism(1); Additionally, I have configured the prefetch count to read 3 data at the same time from RabbitMQ. Here's how I have enabled the checkpointing interval. executionEnvironment.enableCheckpointing(smsInput.checkpointIntervalMS,CheckpointingMode.EXACTLY_ONCE,true); The prefetch count seems to be working fine, but when I run the job with a parallelism of 3, the prefetchCount is not working as expected. We establish a connection to RabbitMQ with a fixed setParallelism of 1. However, my other operators retrieve data from RabbitMQ and execute the job with a parallelism of 3, as shown in the following command. bin/flink run -p 3 ../apps/Flink_1.16.2_prefetch.jar ../config/app-config.properties -yD env.java.home=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.el7_9.x86_64 So kindly provide a solution for configuring the prefetch count with parallelism. Thanks, Ajay Pandey