Copilot commented on code in PR #1422:
URL: https://github.com/apache/pulsar-client-go/pull/1422#discussion_r2340346074


##########
pulsar/producer_partition.go:
##########
@@ -265,6 +267,10 @@ func (p *partitionProducer) lookupTopic(brokerServiceURL 
string) (*internal.Look
 }
 
 func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {

Review Comment:
   This condition allows `grabCnx` to be called when the producer is in 
`producerInit` state, which could lead to unexpected behavior. The condition 
should check if the current state is not `producerReady` or `producerInit` 
before proceeding.
   ```suggestion
   func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
        state := p.getProducerState()
        if state != producerReady {
                // Only allow grabCnx when producer is in producerReady state
                return ErrProducerClosed
        }
   ```



##########
pulsar/producer_partition.go:
##########
@@ -385,31 +391,18 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL 
string) error {
                "epoch": atomic.LoadUint64(&p.epoch),
        }).Info("Connected producer")
 
-       pendingItems := p.pendingQueue.ReadableSlice()
-       viewSize := len(pendingItems)
-       if viewSize > 0 {
-               p.log.Infof("Resending %d pending batches", viewSize)
-               lastViewItem := pendingItems[viewSize-1].(*pendingItem)
-
-               // iterate at most pending items
-               for i := 0; i < viewSize; i++ {
-                       item := p.pendingQueue.Poll()
-                       if item == nil {
-                               continue
-                       }
-                       pi := item.(*pendingItem)
-                       // when resending pending batches, we update the sendAt 
timestamp to record the metric.
-                       pi.Lock()
-                       pi.sentAt = time.Now()
-                       pi.Unlock()
-                       pi.buffer.Retain() // Retain for writing to the 
connection
-                       p.pendingQueue.Put(pi)
-                       p._getConn().WriteData(pi.ctx, pi.buffer)
+       p.pendingQueue.Iterate(func(item any) {
+               pi := item.(*pendingItem)
+               // when resending pending batches, we update the sendAt 
timestamp to record the metric.
+               pi.Lock()
+               pi.sentAt = time.Now()
+               pi.Unlock()
+               pi.buffer.Retain() // Retain for writing to the connection
+               p._getConn().WriteData(pi.ctx, pi.buffer)

Review Comment:
   The `Iterate` function processes all pending items without checking 
connection state during iteration. If the connection fails during this process, 
some messages might be sent to a closed connection. Consider checking 
connection state within the iteration or handling potential write failures.
   ```suggestion
                cnx := p._getConn()
                if cnx == nil || !cnx.IsConnected() {
                        p.log.Warn("Connection closed during pending queue 
iteration; stopping further sends")
                        return
                }
                cnx.WriteData(pi.ctx, pi.buffer)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to