RobertIndie commented on code in PR #1410:
URL: https://github.com/apache/pulsar-client-go/pull/1410#discussion_r2321808356


##########
pulsar/negative_acks_tracker.go:
##########
@@ -103,14 +134,20 @@ func (t *negativeAcksTracker) AddMessage(msg Message) {
        t.Lock()
        defer t.Unlock()
 
-       _, present := t.negativeAcks[batchMsgID]
-       if present {
-               // The batch is already being tracked
-               return
-       }
-
        targetTime := time.Now().Add(nackBackoffDelay)
-       t.negativeAcks[batchMsgID] = targetTime
+       trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), 
t.nackPrecisionBit))
+       // try get trimmedTime
+       value, exists := t.negativeAcks.Get(trimmedTime)
+       if !exists {
+               newMap := make(map[LedgerID]*roaring64.Bitmap)
+               t.negativeAcks.Put(trimmedTime, newMap)
+               value = newMap
+       }
+       bitmapMap := value.(map[LedgerID]*roaring64.Bitmap)
+       if _, exists := bitmapMap[batchMsgID.ledgerID]; !exists {
+               bitmapMap[batchMsgID.ledgerID] = roaring64.NewBitmap()
+       }
+       bitmapMap[batchMsgID.ledgerID].Add(uint64(batchMsgID.entryID))

Review Comment:
   Please reduce the duplicate codes since it's getting more complex.



##########
pulsar/consumer.go:
##########
@@ -183,6 +183,11 @@ type ConsumerOptions struct {
        // processed. Default is 1 min. (See `Consumer.Nack()`)
        NackRedeliveryDelay time.Duration
 
+       // NackPrecisionBit specifies the precision bit for nack redelivery 
delay.
+       // This is used to trim the lower bits of the nack redelivery delay to 
reduce memory usage.
+       // Default is 8 bits.
+       NackPrecisionBit int64

Review Comment:
   How do you set the default value `8`?



##########
pulsar/consumer_test.go:
##########
@@ -1216,13 +1217,86 @@ func TestConsumerNack(t *testing.T) {
        // Failed messages should be resent
 
        // We should only receive the odd messages
-       for i := 1; i < N; i += 2 {
+       receivedOdd := 0
+       expectedOdd := (N + 1) / 2 // Expected number of odd message IDs
+
+       for receivedOdd < expectedOdd {
                msg, err := consumer.Receive(ctx)
                assert.Nil(t, err)
-               assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
 
+               // Extract message ID from the payload (e.g., "msg-content-15")
+               var id int
+               _, err = fmt.Sscanf(string(msg.Payload()), "msg-content-%d", 
&id)
+               assert.Nil(t, err)
+
+               // Count only odd message IDs
+               if id%2 == 1 {
+                       assert.True(t, id%2 == 1) // Optional check, included 
for clarity
+                       receivedOdd++
+               }
+
+               // Acknowledge message to mark it as processed
                consumer.Ack(msg)
        }
+
+       // Verify that the correct number of odd messages were received
+       assert.Equal(t, expectedOdd, receivedOdd)

Review Comment:
   What's the reason for these changes? It seems after this change, the 
consumer would be expected to receive the even messages again which seems 
incorrect?
   You can see the test would still be passed even though the consumer receive 
even messages which is not expected:
   ```
   Receive redelivered message: msg-content-0
   Receive redelivered message: msg-content-1
   Receive redelivered message: msg-content-2
   Receive redelivered message: msg-content-3
   Receive redelivered message: msg-content-4
   Receive redelivered message: msg-content-5
   Receive redelivered message: msg-content-6
   Receive redelivered message: msg-content-7
   Receive redelivered message: msg-content-8
   Receive redelivered message: msg-content-9
   Receive redelivered message: msg-content-10
   Receive redelivered message: msg-content-11
   Receive redelivered message: msg-content-12
   Receive redelivered message: msg-content-13
   Receive redelivered message: msg-content-14
   Receive redelivered message: msg-content-15
   Receive redelivered message: msg-content-16
   ```
   
   <img width="1706" height="1076" alt="image" 
src="https://github.com/user-attachments/assets/f3ed9a60-9b45-4876-875b-8cda6e7b80bc";
 />
   



##########
pulsar/consumer_test.go:
##########
@@ -1185,6 +1185,7 @@ func TestConsumerNack(t *testing.T) {
                SubscriptionName:    "sub-1",
                Type:                Shared,
                NackRedeliveryDelay: 1 * time.Second,
+               NackPrecisionBit:    8,

Review Comment:
   We should add a test to verfy the default value of `NackPrecisionBit`.



##########
pulsar/consumer_test.go:
##########
@@ -1216,13 +1217,86 @@ func TestConsumerNack(t *testing.T) {
        // Failed messages should be resent
 
        // We should only receive the odd messages
-       for i := 1; i < N; i += 2 {
+       receivedOdd := 0
+       expectedOdd := (N + 1) / 2 // Expected number of odd message IDs
+
+       for receivedOdd < expectedOdd {
                msg, err := consumer.Receive(ctx)
                assert.Nil(t, err)
-               assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
 
+               // Extract message ID from the payload (e.g., "msg-content-15")
+               var id int
+               _, err = fmt.Sscanf(string(msg.Payload()), "msg-content-%d", 
&id)
+               assert.Nil(t, err)
+
+               // Count only odd message IDs
+               if id%2 == 1 {
+                       assert.True(t, id%2 == 1) // Optional check, included 
for clarity
+                       receivedOdd++
+               }
+
+               // Acknowledge message to mark it as processed
                consumer.Ack(msg)
        }
+
+       // Verify that the correct number of odd messages were received
+       assert.Equal(t, expectedOdd, receivedOdd)
+}
+
+func TestNegativeAckPrecisionBitCnt(t *testing.T) {
+       const delay = 1 * time.Second
+
+       for precision := 1; precision <= 8; precision++ {
+               topicName := 
fmt.Sprintf("testNegativeAckPrecisionBitCnt-%d-%d", precision, 
time.Now().UnixNano())
+               ctx := context.Background()
+               client, err := NewClient(ClientOptions{URL: lookupURL})
+               assert.Nil(t, err)
+               defer client.Close()
+
+               consumer, err := client.Subscribe(ConsumerOptions{
+                       Topic:               topicName,
+                       SubscriptionName:    "sub-1",
+                       Type:                Shared,
+                       NackRedeliveryDelay: delay,
+                       NackPrecisionBit:    int64(precision),
+               })
+               assert.Nil(t, err)
+               defer consumer.Close()
+
+               producer, err := client.CreateProducer(ProducerOptions{
+                       Topic: topicName,
+               })
+               assert.Nil(t, err)
+               defer producer.Close()
+
+               // Send single message
+               content := "test-0"
+               _, err = producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(content),
+               })
+               assert.Nil(t, err)
+
+               // Receive and send negative ack
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, content, string(msg.Payload()))
+               consumer.Nack(msg)
+
+               // Calculate expected redelivery window
+               expectedRedelivery := time.Now().Add(delay)
+               deviation := time.Duration(int64(1)<<precision) * 
time.Millisecond
+
+               // Wait for redelivery
+               redelivered, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, content, string(redelivered.Payload()))
+
+               now := time.Now()
+               // Assert that redelivery happens >= expected - deviation
+               assert.GreaterOrEqual(t, now.UnixMilli(), 
expectedRedelivery.UnixMilli()-deviation.Milliseconds())

Review Comment:
   We should add the `LessOrEqual` assertion to ensure the redelivery time is 
not too long.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to