RobertIndie opened a new pull request, #1422:
URL: https://github.com/apache/pulsar-client-go/pull/1422

   Fixes https://github.com/apache/pulsar-client-go/issues/1332
   
   ## Motivation
   
   The root cause of issue #1332 is that the producer's main event loop becomes 
blocked during the reconnection process. This prevents it from processing new 
send requests from the `dataChan`.
   
   Consequently, messages waiting in `dataChan` cannot be moved to the pending 
message queue, causing them to bypass the `SendTimeout` check. Furthermore, the 
`context` passed to `SendAsync` is not respected when the loop is blocked.
   
   This behavior was introduced after PR #1029, which merged reconnection logic 
and data handling logic into a single event loop. The root cause of the issue 
in https://github.com/apache/pulsar-client-go/pull/1029 is that during producer 
reconnection, resending messages and sending new messages occur at the same 
time. This causes the messages to be sent out of order, which leads to the 
"received an ack larger than expected" error.
   
   ## Modifications
   
   This PR refactors the producer's reconnection logic to solve the blocking 
issue while also preventing the original message ordering problem.
   
   The key changes are:
   
   1.  **Separate Reconnection and Data Handling Logic**: The reconnection 
process has been moved into its own dedicated goroutine (event loop). This 
ensures the main producer data loop remains unblocked and can continuously 
process send requests, even when a reconnection is in progress.
   
   2.  **Introduce `producerConnecting` State**: A new state, 
`producerConnecting`, is introduced to manage the producer's behavior during 
reconnection. When in this state, new messages are added to the pending queue 
but are not immediately sent. This prevents new messages from being sent 
concurrently with resent messages from the previous connection, thus fixing the 
root cause of the out-of-order issue from #1029.
   
   3.  **Respect `SendAsync` Context**: The ctx passed to SendAsync is now 
respected.
   
   ### New Reconnection Flow
   
   The new logic creates a clear separation of concerns between the two event 
loops:
   
   | Reconnection event loop                                   | Producer data 
event loop                                                                      
                                                                                
                                                                                
                   |
   | --------------------------------------------------------- | 
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 |
   | Receive the connection-closed event                       |                
                                                                                
                                                                                
                                                                                
                  |
   |                                                           | At this point, 
the connection is closed. Messages sent to a closed connection will be ignored 
and will be resent after reconnection.                                          
                                                                                
                   |
   | Set the state to `producerConnecting`. Start reconnecting |                
                                                                                
                                                                                
                                                                                
                  |
   |                                                           | At this point, 
new messages will only be pushed into the pending queue without being sent to 
the connection.                                                                 
                                                                                
                    |
   | Connected to the broker, start resending pending messages |                
                                                                                
                                                                                
                                                                                
                  |
   |                                                           | At this point, 
new messages are still prevented from being sent to the new connection until 
resending has finished. Otherwise, this would cause messages to be delivered 
out of order, which is the root cause of 
[#1029](https://github.com/apache/pulsar-client-go/pull/1029). |
   | Resend finishes. Set the state to `producerReady`         |                
                                                                                
                                                                                
                                                                                
                  |
   |                                                           | At this point, 
new messages will be sent to the new connection normally.                       
                                                                                
                                                                                
                  |
   
   
   ---
   
   ## Documentation Updates
   
   The documentation for `SendAsync` has been updated to clarify the distinct 
roles of the `context` and the `SendTimeout` configuration:
   
   > The context passed in the `SendAsync` call is only used for the duration 
of the `SendAsync` call itself (i.e., to control blocking when the queue is 
full), and not for the entire message lifetime. Once `SendAsync` returns 
successfully, the message's lifetime is controlled by the `SendTimeout` 
configuration.
   
   
   ### Verifying this change
   
   Added a new unit test, `TestSendAsyncCouldTimeoutWhileReconnecting` (based 
on the test from PR #1345), to verify that message timeouts and context 
cancellations function correctly during a producer reconnection cycle. And also 
other unit tests to cover changes.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes / no)
     - The public API: (yes / no)
     - The schema: (yes / no / don't know)
     - The default values of configurations: (yes / no)
     - The wire protocol: (yes / no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (yes / no)
     - If yes, how is the feature documented? (not applicable / docs / GoDocs / 
not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a followup 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to