Copilot commented on code in PR #1422:
URL: https://github.com/apache/pulsar-client-go/pull/1422#discussion_r2340346074
##########
pulsar/producer_partition.go:
##########
@@ -265,6 +267,10 @@ func (p *partitionProducer) lookupTopic(brokerServiceURL
string) (*internal.Look
}
func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
Review Comment:
This condition allows `grabCnx` to be called when the producer is in
`producerInit` state, which could lead to unexpected behavior. The condition
should check if the current state is not `producerReady` or `producerInit`
before proceeding.
```suggestion
func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
state := p.getProducerState()
if state != producerReady {
// Only allow grabCnx when producer is in producerReady state
return ErrProducerClosed
}
```
##########
pulsar/producer_partition.go:
##########
@@ -385,31 +391,18 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL
string) error {
"epoch": atomic.LoadUint64(&p.epoch),
}).Info("Connected producer")
- pendingItems := p.pendingQueue.ReadableSlice()
- viewSize := len(pendingItems)
- if viewSize > 0 {
- p.log.Infof("Resending %d pending batches", viewSize)
- lastViewItem := pendingItems[viewSize-1].(*pendingItem)
-
- // iterate at most pending items
- for i := 0; i < viewSize; i++ {
- item := p.pendingQueue.Poll()
- if item == nil {
- continue
- }
- pi := item.(*pendingItem)
- // when resending pending batches, we update the sendAt
timestamp to record the metric.
- pi.Lock()
- pi.sentAt = time.Now()
- pi.Unlock()
- pi.buffer.Retain() // Retain for writing to the
connection
- p.pendingQueue.Put(pi)
- p._getConn().WriteData(pi.ctx, pi.buffer)
+ p.pendingQueue.Iterate(func(item any) {
+ pi := item.(*pendingItem)
+ // when resending pending batches, we update the sendAt
timestamp to record the metric.
+ pi.Lock()
+ pi.sentAt = time.Now()
+ pi.Unlock()
+ pi.buffer.Retain() // Retain for writing to the connection
+ p._getConn().WriteData(pi.ctx, pi.buffer)
Review Comment:
The `Iterate` function processes all pending items without checking
connection state during iteration. If the connection fails during this process,
some messages might be sent to a closed connection. Consider checking
connection state within the iteration or handling potential write failures.
```suggestion
cnx := p._getConn()
if cnx == nil || !cnx.IsConnected() {
p.log.Warn("Connection closed during pending queue
iteration; stopping further sends")
return
}
cnx.WriteData(pi.ctx, pi.buffer)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]