Copilot commented on code in PR #1410:
URL: https://github.com/apache/pulsar-client-go/pull/1410#discussion_r2314250790
##########
pulsar/negative_acks_tracker.go:
##########
@@ -77,14 +102,20 @@ func (t *negativeAcksTracker) Add(msgID *messageID) {
t.Lock()
defer t.Unlock()
- _, present := t.negativeAcks[batchMsgID]
- if present {
- // The batch is already being tracked
- return
- }
-
targetTime := time.Now().Add(t.delay)
- t.negativeAcks[batchMsgID] = targetTime
+ trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(),
t.nackPrecisionBit))
+ // try get trimmedTime
+ value, exists := t.negativeAcks.Get(trimmedTime)
+ if !exists {
+ newMap := make(map[LedgerID]*roaring64.Bitmap)
+ t.negativeAcks.Put(trimmedTime, newMap)
+ value = newMap
+ }
+ bitmapMap := value.(map[LedgerID]*roaring64.Bitmap)
Review Comment:
The type assertion `value.(map[LedgerID]*roaring64.Bitmap)` could panic if
the value is not of the expected type. Consider using safer type assertion with
error handling to prevent potential runtime panics.
```suggestion
bitmapMap, ok := value.(map[LedgerID]*roaring64.Bitmap)
if !ok {
t.log.Errorf("negativeAcksTracker: value for time %v is not of
expected type map[LedgerID]*roaring64.Bitmap", trimmedTime)
return
}
```
##########
pulsar/consumer_test.go:
##########
@@ -1216,13 +1217,86 @@ func TestConsumerNack(t *testing.T) {
// Failed messages should be resent
// We should only receive the odd messages
- for i := 1; i < N; i += 2 {
+ receivedOdd := 0
+ expectedOdd := (N + 1) / 2 // Expected number of odd message IDs
+
+ for receivedOdd < expectedOdd {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
- assert.Equal(t, fmt.Sprintf("msg-content-%d", i),
string(msg.Payload()))
+ // Extract message ID from the payload (e.g., "msg-content-15")
+ var id int
+ _, err = fmt.Sscanf(string(msg.Payload()), "msg-content-%d",
&id)
+ assert.Nil(t, err)
+
+ // Count only odd message IDs
+ if id%2 == 1 {
+ assert.True(t, id%2 == 1) // Optional check, included
for clarity
Review Comment:
This assertion is redundant since it's checking the same condition as the if
statement on line 1233. The comment indicates it's optional, so it should be
removed to avoid unnecessary code.
```suggestion
```
##########
pulsar/negative_acks_tracker.go:
##########
@@ -103,14 +134,20 @@ func (t *negativeAcksTracker) AddMessage(msg Message) {
t.Lock()
defer t.Unlock()
- _, present := t.negativeAcks[batchMsgID]
- if present {
- // The batch is already being tracked
- return
- }
-
targetTime := time.Now().Add(nackBackoffDelay)
- t.negativeAcks[batchMsgID] = targetTime
+ trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(),
t.nackPrecisionBit))
+ // try get trimmedTime
+ value, exists := t.negativeAcks.Get(trimmedTime)
+ if !exists {
+ newMap := make(map[LedgerID]*roaring64.Bitmap)
+ t.negativeAcks.Put(trimmedTime, newMap)
+ value = newMap
+ }
+ bitmapMap := value.(map[LedgerID]*roaring64.Bitmap)
Review Comment:
Same unsafe type assertion issue as in the Add method. Consider using
`value, ok := value.(map[LedgerID]*roaring64.Bitmap)` pattern with proper error
handling.
```suggestion
bitmapMap, ok := value.(map[LedgerID]*roaring64.Bitmap)
if !ok {
t.log.Error("negativeAcks value is not of type
map[LedgerID]*roaring64.Bitmap")
return
}
```
##########
pulsar/negative_acks_tracker.go:
##########
@@ -21,35 +21,52 @@ import (
"sync"
"time"
+ "github.com/RoaringBitmap/roaring/v2/roaring64"
log "github.com/apache/pulsar-client-go/pulsar/log"
+ "github.com/emirpasic/gods/trees/avltree"
)
type redeliveryConsumer interface {
Redeliver(msgIDs []messageID)
}
+type LedgerID = int64
+
type negativeAcksTracker struct {
sync.Mutex
- doneCh chan interface{}
- doneOnce sync.Once
- negativeAcks map[messageID]time.Time
- rc redeliveryConsumer
- nackBackoff NackBackoffPolicy
- tick *time.Ticker
- delay time.Duration
- log log.Logger
+ doneCh chan interface{}
+ doneOnce sync.Once
+ negativeAcks *avltree.Tree
+ nackPrecisionBit int64
+ rc redeliveryConsumer
+ nackBackoff NackBackoffPolicy
+ tick *time.Ticker
+ delay time.Duration
+ log log.Logger
}
func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
- nackBackoffPolicy NackBackoffPolicy, logger log.Logger)
*negativeAcksTracker {
+ nackBackoffPolicy NackBackoffPolicy, logger log.Logger,
nackPrecisionBit int64) *negativeAcksTracker {
t := &negativeAcksTracker{
- doneCh: make(chan interface{}),
- negativeAcks: make(map[messageID]time.Time),
- rc: rc,
- nackBackoff: nackBackoffPolicy,
- log: logger,
+ doneCh: make(chan interface{}),
+ negativeAcks: avltree.NewWith(func(a, b interface{}) int {
+ // compare time.Time
+ timeA := a.(time.Time)
+ timeB := b.(time.Time)
+ if timeA.Before(timeB) {
+ return -1
+ }
+ if timeA.After(timeB) {
+ return 1
+ }
+ return 0
+ }),
Review Comment:
The type assertions `a.(time.Time)` and `b.(time.Time)` could panic if the
wrong types are passed. Consider using safer type assertions with the `value,
ok := a.(time.Time)` pattern and handle the error case.
##########
pulsar/negative_acks_tracker.go:
##########
@@ -77,14 +102,20 @@ func (t *negativeAcksTracker) Add(msgID *messageID) {
t.Lock()
defer t.Unlock()
- _, present := t.negativeAcks[batchMsgID]
- if present {
- // The batch is already being tracked
- return
- }
-
targetTime := time.Now().Add(t.delay)
- t.negativeAcks[batchMsgID] = targetTime
+ trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(),
t.nackPrecisionBit))
Review Comment:
The logic for calculating trimmed time is duplicated between Add and
AddMessage methods. Consider extracting this into a helper method to reduce
code duplication and improve maintainability.
##########
pulsar/negative_acks_tracker.go:
##########
@@ -103,14 +134,20 @@ func (t *negativeAcksTracker) AddMessage(msg Message) {
t.Lock()
defer t.Unlock()
- _, present := t.negativeAcks[batchMsgID]
- if present {
- // The batch is already being tracked
- return
- }
-
targetTime := time.Now().Add(nackBackoffDelay)
- t.negativeAcks[batchMsgID] = targetTime
+ trimmedTime := time.UnixMilli(trimLowerBit(targetTime.UnixMilli(),
t.nackPrecisionBit))
Review Comment:
The logic for calculating trimmed time is duplicated between Add and
AddMessage methods. Consider extracting this into a helper method to reduce
code duplication and improve maintainability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]