BewareMyPower commented on code in PR #1422:
URL: https://github.com/apache/pulsar-client-go/pull/1422#discussion_r2341027356


##########
pulsar/producer_partition.go:
##########
@@ -902,6 +904,14 @@ func (p *partitionProducer) writeData(buffer 
internal.Buffer, sequenceID uint64,
                now := time.Now()
                ctx, cancel := context.WithCancel(context.Background())
                buffer.Retain()
+               conn := p._getConn()
+               if p.getProducerState() == producerReady {
+                       // If the producer is reconnecting, we should not write 
to the connection.
+                       // We just need to push the buffer to the pending 
queue, it will be sent during the reconnecting.
+                       conn.WriteData(ctx, buffer)
+               } else {
+                       p.log.Debug("Skipping write to connection, producer 
state: ", p.getProducerState())
+               }

Review Comment:
   There is still a race.
   
   1. Goroutine A: `reconnectToBroker` is called, state becomes 
`producerConnecting`
   2. Goroutine B: `msg-0` is passed to `internalSend` and added to 
`pendingQueue`
   3. Goroutine A: `grabCnx` finishes, `pendingQueue.Iterate` resends `msg-0` 
to broker
   4. Goroutine B: `msg-1` is passed to `internalSend`, `p.getProducerState()` 
still returns `producerConnecting`, so `msg-1` will be added to `pendingQueue` 
later.
   5. Goroutine A: Call `casProducerState(producerConnecting, producerReady)` 
to update the state to `producerReady`
   6. Goroutine B: `msg-2` is passed to `internalSend` and sent to broker 
because the state is now ready
   
   As a result, `msg-1` won't be sent until the next reconnection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to