2020-04-30 10:47:28 UTC - Avimas: Hi Guys,
I started to work with JavaClient 2.5.1 batchReceive method and I think we have 
few issues in this area, and I like to get your feedback on this.

1. Batch size is not limited to the minimum of the maxNumberOfMessages and 
maxSizeOfMessages from the BatchRecieve policy.
I think this issue can be easly fixed by changing the canAdd function in 
MessagesImpl from:

protected boolean canAdd(Message<T> message) {
    if (this.maxNumberOfMessages <= 0 && this.maxSizeOfMessages 
<= 0L) {
        return true;
    } else {
        return this.maxNumberOfMessages > 0 && 
this.currentNumberOfMessages + 1 <= this.maxNumberOfMessages || 
this.maxSizeOfMessages > 0L &&   this.currentSizeOfMessages + 
(long)message.getData().length <= this.maxSizeOfMessages;
    }
}

to (changing the condintion in the else to && instead of ||):

protected boolean canAdd(Message<T> message) {
    if (this.maxNumberOfMessages <= 0 && this.maxSizeOfMessages 
<= 0L) {
        return true;
    } else {
        return (this.maxNumberOfMessages > 0 && 
this.currentNumberOfMessages + 1 <= this.maxNumberOfMessages) *&&* 
(this.maxSizeOfMessages > 0L &&   this.currentSizeOfMessages + 
(long)message.getData().length <= this.maxSizeOfMessages);
    }
}

2. When the batch size is higher than the recieveQ of the consumer (I used a 
batch size of 3000 and a receiveQ of 500) I noticed the following issues:

        a. In a mutliTopic (pattern) consumer the client stops receiving any 
messages I think it getting paused and never resumed when setting a timeout in 
the batch policy, only one batch is fetched and the client never resumed.

        b.In a simple topic consumer, I get three batches of 3000 which I think 
is very bad considering maxReceive Q is 500, and after that the client stops 
receiving any traffic.


Do you think I should open issues on this?
Any limitation I am not aware of?
----
2020-04-30 12:58:33 UTC - Penghui Li: Hi @Avimas, thanks for your feedback.  
You can open an issue for these two problems. Other users may also met these 
problems, so that they can find them from Github issues. For the first problem, 
the change looks good, pull request is welcome. I will take a look at the 
second issue later.
----
2020-04-30 14:28:36 UTC - Avimas: Great thanks! I will create a pull request 
for the first problem, and open issues for the others.
----
2020-04-30 14:57:08 UTC - Penghui Li: @Avimas , I run a test for the second 
issue (single topic consumer, but can’t reproduce it, I’m working on the master 
branch
```    @Test
    public void test() throws PulsarClientException {
        final String topic = "<persistent://my-property/my-ns/test>";

        Producer&lt;byte[]&gt; producer = 
pulsarClient.newProducer().topic(topic).blockIfQueueFull(true).create();
        new Thread(() -&gt; {
            while (true) {
                producer.sendAsync("".getBytes());
            }
        }).start();

        Consumer&lt;byte[]&gt; consumer = pulsarClient.newConsumer()
                .topic(topic)
                .receiverQueueSize(100)
                .subscriptionName("test")
                
.batchReceivePolicy(BatchReceivePolicy.builder().maxNumMessages(5000).timeout(1,
 TimeUnit.SECONDS).build())
                .subscribe();

        while (true){
            Messages&lt;byte[]&gt; msg = consumer.batchReceive();
            System.out.println(msg.size());
        }

    }```
----
2020-04-30 14:57:08 UTC - Penghui Li: ```5000
5000
2993
977
5000
5000
5000
5000
1790
5000
5000
5000
5000
5000
5000
1136
5000
5000
5000
5000
5000
5000
5000
5000
5000
5000```
----
2020-04-30 14:57:37 UTC - Penghui Li: I’m not sure if I missed some thing.
----
2020-04-30 15:09:26 UTC - Avimas: Try Removing the 
timeout
----
2020-04-30 15:10:40 UTC - Avimas: Notice that the batch size is greater than 
the receive q which is not so good I assume 
----
2020-04-30 15:22:32 UTC - Avimas: @Penghui Li this is my producer configuration
----
2020-04-30 15:22:49 UTC - Avimas: ```clientProviderForTests.getPulsarClient()
        .newProducer()
        .blockIfQueueFull(true)
        .maxPendingMessages(2000)
        .enableBatching(true)
        .batchingMaxMessages(2000)
        .sendTimeout(30, TimeUnit.SECONDS)
        .topic("public/default/test").create();```

----
2020-04-30 15:24:06 UTC - Avimas: and this my consumer
----
2020-04-30 15:24:14 UTC - Avimas: ```consumer = 
clientProviderForTests.getPulsarClient().newConsumer()
        //.subscriptionType(SubscriptionType.Shared)
        .subscriptionName("disconnections3")
        .topic("public/default/test")
        .receiverQueueSize(500)
        
.batchReceivePolicy(BatchReceivePolicy.builder().maxNumBytes(1024*1024).maxNumMessages(3000).build())
        .subscribe();```

----
2020-04-30 15:24:53 UTC - Avimas: let me know if the issue reproduced.
----
2020-04-30 15:57:47 UTC - Avimas: @Penghui Li just verified the issue is 
reproduced in my environment when the timeout is omitted
----
2020-04-30 18:25:34 UTC - Greg Methvin: I also noticed something similar to (2) 
even without batching when `receiverQueueSize` is less than 
`batchingMaxMessages`. If I had 10 consumers I might only have one or two that 
actually were processing messages, and the rest were “stuck”. I learned the 
hard way to always set `receiverQueueSize &gt; batchingMaxMessages`.
----
2020-04-30 18:49:24 UTC - Avimas: Yes it is a good solution, but I think it 
hides a bigger issue 
----
2020-05-01 01:16:39 UTC - Penghui Li: @Avimas Oh, I see. The problem is related 
to the flow permits control when you set `batchingMaxMessages` greater that 
`receiverQueueSize` . I have create an issue 
<https://github.com/apache/pulsar/issues/6854> and will fix it later.
----
2020-05-01 04:16:51 UTC - Frank Xu: @Frank Xu has joined the channel
----
2020-05-01 05:23:58 UTC - Avimas: Thanks @Penghui Li 
----

Reply via email to