RobertIndie opened a new pull request, #1422: URL: https://github.com/apache/pulsar-client-go/pull/1422
Fixes https://github.com/apache/pulsar-client-go/issues/1332 ## Motivation The root cause of issue #1332 is that the producer's main event loop becomes blocked during the reconnection process. This prevents it from processing new send requests from the `dataChan`. Consequently, messages waiting in `dataChan` cannot be moved to the pending message queue, causing them to bypass the `SendTimeout` check. Furthermore, the `context` passed to `SendAsync` is not respected when the loop is blocked. This behavior was introduced after PR #1029, which merged reconnection logic and data handling logic into a single event loop. The root cause of the issue in https://github.com/apache/pulsar-client-go/pull/1029 is that during producer reconnection, resending messages and sending new messages occur at the same time. This causes the messages to be sent out of order, which leads to the "received an ack larger than expected" error. ## Modifications This PR refactors the producer's reconnection logic to solve the blocking issue while also preventing the original message ordering problem. The key changes are: 1. **Separate Reconnection and Data Handling Logic**: The reconnection process has been moved into its own dedicated goroutine (event loop). This ensures the main producer data loop remains unblocked and can continuously process send requests, even when a reconnection is in progress. 2. **Introduce `producerConnecting` State**: A new state, `producerConnecting`, is introduced to manage the producer's behavior during reconnection. When in this state, new messages are added to the pending queue but are not immediately sent. This prevents new messages from being sent concurrently with resent messages from the previous connection, thus fixing the root cause of the out-of-order issue from #1029. 3. **Respect `SendAsync` Context**: The ctx passed to SendAsync is now respected. ### New Reconnection Flow The new logic creates a clear separation of concerns between the two event loops: | Reconnection event loop | Producer data event loop | | --------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | Receive the connection-closed event | | | | At this point, the connection is closed. Messages sent to a closed connection will be ignored and will be resent after reconnection. | | Set the state to `producerConnecting`. Start reconnecting | | | | At this point, new messages will only be pushed into the pending queue without being sent to the connection. | | Connected to the broker, start resending pending messages | | | | At this point, new messages are still prevented from being sent to the new connection until resending has finished. Otherwise, this would cause messages to be delivered out of order, which is the root cause of [#1029](https://github.com/apache/pulsar-client-go/pull/1029). | | Resend finishes. Set the state to `producerReady` | | | | At this point, new messages will be sent to the new connection normally. | --- ## Documentation Updates The documentation for `SendAsync` has been updated to clarify the distinct roles of the `context` and the `SendTimeout` configuration: > The context passed in the `SendAsync` call is only used for the duration of the `SendAsync` call itself (i.e., to control blocking when the queue is full), and not for the entire message lifetime. Once `SendAsync` returns successfully, the message's lifetime is controlled by the `SendTimeout` configuration. ### Verifying this change Added a new unit test, `TestSendAsyncCouldTimeoutWhileReconnecting` (based on the test from PR #1345), to verify that message timeouts and context cancellations function correctly during a producer reconnection cycle. And also other unit tests to cover changes. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API: (yes / no) - The schema: (yes / no / don't know) - The default values of configurations: (yes / no) - The wire protocol: (yes / no) ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
