Copilot commented on code in PR #1410:
URL: https://github.com/apache/pulsar-client-go/pull/1410#discussion_r2314250790


##########
pulsar/negative_acks_tracker.go:
##########
@@ -77,14 +102,20 @@ func (t *negativeAcksTracker) Add(msgID *messageID) {
        t.Lock()
        defer t.Unlock()
 
-       _, present := t.negativeAcks[batchMsgID]
-       if present {
-               // The batch is already being tracked
-               return
-       }
-
        targetTime := time.Now().Add(t.delay)
-       t.negativeAcks[batchMsgID] = targetTime
+       trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), 
t.nackPrecisionBit))
+       // try get trimmedTime
+       value, exists := t.negativeAcks.Get(trimmedTime)
+       if !exists {
+               newMap := make(map[LedgerID]*roaring64.Bitmap)
+               t.negativeAcks.Put(trimmedTime, newMap)
+               value = newMap
+       }
+       bitmapMap := value.(map[LedgerID]*roaring64.Bitmap)

Review Comment:
   The type assertion `value.(map[LedgerID]*roaring64.Bitmap)` could panic if 
the value is not of the expected type. Consider using safer type assertion with 
error handling to prevent potential runtime panics.
   ```suggestion
        bitmapMap, ok := value.(map[LedgerID]*roaring64.Bitmap)
        if !ok {
                t.log.Errorf("negativeAcksTracker: value for time %v is not of 
expected type map[LedgerID]*roaring64.Bitmap", trimmedTime)
                return
        }
   ```



##########
pulsar/consumer_test.go:
##########
@@ -1216,13 +1217,86 @@ func TestConsumerNack(t *testing.T) {
        // Failed messages should be resent
 
        // We should only receive the odd messages
-       for i := 1; i < N; i += 2 {
+       receivedOdd := 0
+       expectedOdd := (N + 1) / 2 // Expected number of odd message IDs
+
+       for receivedOdd < expectedOdd {
                msg, err := consumer.Receive(ctx)
                assert.Nil(t, err)
-               assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
 
+               // Extract message ID from the payload (e.g., "msg-content-15")
+               var id int
+               _, err = fmt.Sscanf(string(msg.Payload()), "msg-content-%d", 
&id)
+               assert.Nil(t, err)
+
+               // Count only odd message IDs
+               if id%2 == 1 {
+                       assert.True(t, id%2 == 1) // Optional check, included 
for clarity

Review Comment:
   This assertion is redundant since it's checking the same condition as the if 
statement on line 1233. The comment indicates it's optional, so it should be 
removed to avoid unnecessary code.
   ```suggestion
   
   ```



##########
pulsar/negative_acks_tracker.go:
##########
@@ -103,14 +134,20 @@ func (t *negativeAcksTracker) AddMessage(msg Message) {
        t.Lock()
        defer t.Unlock()
 
-       _, present := t.negativeAcks[batchMsgID]
-       if present {
-               // The batch is already being tracked
-               return
-       }
-
        targetTime := time.Now().Add(nackBackoffDelay)
-       t.negativeAcks[batchMsgID] = targetTime
+       trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), 
t.nackPrecisionBit))
+       // try get trimmedTime
+       value, exists := t.negativeAcks.Get(trimmedTime)
+       if !exists {
+               newMap := make(map[LedgerID]*roaring64.Bitmap)
+               t.negativeAcks.Put(trimmedTime, newMap)
+               value = newMap
+       }
+       bitmapMap := value.(map[LedgerID]*roaring64.Bitmap)

Review Comment:
   Same unsafe type assertion issue as in the Add method. Consider using 
`value, ok := value.(map[LedgerID]*roaring64.Bitmap)` pattern with proper error 
handling.
   ```suggestion
        bitmapMap, ok := value.(map[LedgerID]*roaring64.Bitmap)
        if !ok {
                t.log.Error("negativeAcks value is not of type 
map[LedgerID]*roaring64.Bitmap")
                return
        }
   ```



##########
pulsar/negative_acks_tracker.go:
##########
@@ -21,35 +21,52 @@ import (
        "sync"
        "time"
 
+       "github.com/RoaringBitmap/roaring/v2/roaring64"
        log "github.com/apache/pulsar-client-go/pulsar/log"
+       "github.com/emirpasic/gods/trees/avltree"
 )
 
 type redeliveryConsumer interface {
        Redeliver(msgIDs []messageID)
 }
 
+type LedgerID = int64
+
 type negativeAcksTracker struct {
        sync.Mutex
 
-       doneCh       chan interface{}
-       doneOnce     sync.Once
-       negativeAcks map[messageID]time.Time
-       rc           redeliveryConsumer
-       nackBackoff  NackBackoffPolicy
-       tick         *time.Ticker
-       delay        time.Duration
-       log          log.Logger
+       doneCh           chan interface{}
+       doneOnce         sync.Once
+       negativeAcks     *avltree.Tree
+       nackPrecisionBit int64
+       rc               redeliveryConsumer
+       nackBackoff      NackBackoffPolicy
+       tick             *time.Ticker
+       delay            time.Duration
+       log              log.Logger
 }
 
 func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
-       nackBackoffPolicy NackBackoffPolicy, logger log.Logger) 
*negativeAcksTracker {
+       nackBackoffPolicy NackBackoffPolicy, logger log.Logger, 
nackPrecisionBit int64) *negativeAcksTracker {
 
        t := &negativeAcksTracker{
-               doneCh:       make(chan interface{}),
-               negativeAcks: make(map[messageID]time.Time),
-               rc:           rc,
-               nackBackoff:  nackBackoffPolicy,
-               log:          logger,
+               doneCh: make(chan interface{}),
+               negativeAcks: avltree.NewWith(func(a, b interface{}) int {
+                       // compare time.Time
+                       timeA := a.(time.Time)
+                       timeB := b.(time.Time)
+                       if timeA.Before(timeB) {
+                               return -1
+                       }
+                       if timeA.After(timeB) {
+                               return 1
+                       }
+                       return 0
+               }),

Review Comment:
   The type assertions `a.(time.Time)` and `b.(time.Time)` could panic if the 
wrong types are passed. Consider using safer type assertions with the `value, 
ok := a.(time.Time)` pattern and handle the error case.



##########
pulsar/negative_acks_tracker.go:
##########
@@ -77,14 +102,20 @@ func (t *negativeAcksTracker) Add(msgID *messageID) {
        t.Lock()
        defer t.Unlock()
 
-       _, present := t.negativeAcks[batchMsgID]
-       if present {
-               // The batch is already being tracked
-               return
-       }
-
        targetTime := time.Now().Add(t.delay)
-       t.negativeAcks[batchMsgID] = targetTime
+       trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), 
t.nackPrecisionBit))

Review Comment:
   The logic for calculating trimmed time is duplicated between Add and 
AddMessage methods. Consider extracting this into a helper method to reduce 
code duplication and improve maintainability.



##########
pulsar/negative_acks_tracker.go:
##########
@@ -103,14 +134,20 @@ func (t *negativeAcksTracker) AddMessage(msg Message) {
        t.Lock()
        defer t.Unlock()
 
-       _, present := t.negativeAcks[batchMsgID]
-       if present {
-               // The batch is already being tracked
-               return
-       }
-
        targetTime := time.Now().Add(nackBackoffDelay)
-       t.negativeAcks[batchMsgID] = targetTime
+       trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(), 
t.nackPrecisionBit))

Review Comment:
   The logic for calculating trimmed time is duplicated between Add and 
AddMessage methods. Consider extracting this into a helper method to reduce 
code duplication and improve maintainability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to