Hi ajay, When you have 3 parallelisms you will have 3 independent clients. If you want to keep prefetch count 3 you need to set setRequestedChannelMax as 1 and setParallelism 3. So All 3 clients can have one connection.
Talat On Tue, May 7, 2024 at 5:52 AM ajay pandey <ajaypandey3...@gmail.com> wrote: > Hi Flink Team, > > > I am currently reading streaming data from RabbitMQ and using the > RMQConnectionConfig for establishing the connection. Here's how I'm setting > up the connection: > and we use flink version 1.16.2 and RabbitMQ version 3.10.7 > > RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() > .setPrefetchCount(smsInput.prefetchCount) > .setHost(smsInput.HostServer) > .setPort(smsInput.HostPort) > .setUserName(smsInput.HostUserName) > .setPassword(smsInput.HostPassword) > .setVirtualHost("/") > .build(); > > ConnectionFactory rabbitMQConnectionFactory = > connectionConfig.getConnectionFactory(); > rabbitMQConnectionFactory.setRequestedChannelMax(smsInput.prefetchCount); > // Set prefetchcount > > DataStream<String> stream = executionEnvironment.addSource(new > RMQSource<String>(connectionConfig, > smsInput.QueueName, > new SimpleStringSchema())) > .setParallelism(1); > > > Additionally, I have configured the prefetch count to read 3 data at the > same time from RabbitMQ. Here's how I have enabled the checkpointing > interval. > > > executionEnvironment.enableCheckpointing(smsInput.checkpointIntervalMS,CheckpointingMode.EXACTLY_ONCE,true); > > The prefetch count seems to be working fine, but when I run the job with a > parallelism of 3, the prefetchCount is not working as expected. > > We establish a connection to RabbitMQ with a fixed setParallelism of 1. > However, my other operators retrieve data from RabbitMQ and execute the job > with a parallelism of 3, as shown in the following command. > > bin/flink run -p 3 ../apps/Flink_1.16.2_prefetch.jar > ../config/app-config.properties -yD > env.java.home=/usr/lib/jvm/java-11-openjdk-11.0.19.0.7-1.el7_9.x86_64 > > So kindly provide a solution for configuring the prefetch count with > parallelism. > > > > Thanks, > Ajay Pandey >