Hi Pulsar Community, I discovered a race condition in Pulsar’s Java Client ProducerImpl that can lead to messages persisted out-of-order for a single producer sending to a non-partitioned topic. I can reproduce this issue, and I verified the order by adding sequence ids to the message payload before calling `producer.send`. I opened a PR to fix the race [0] and another to improve the broker’s behavior [1].
At a high level, the ProducerImpl can get into a corrupt state if it switches connections too quickly. In this corrupt state, the producer can send messages before, during, and after the producer is registered to the broker. Because the broker ignores messages until a producer is created for the ServerCnx, some of the early messages are ignored and, once the producer is created, some later ones are persisted. In PR [1], I propose that when a broker gets an unexpected message (Send command), it should close the connection to protect against clients that are not following the protocol instead of simply ignoring unexpected messages. The protocol already states that clients are to register producers and then start sending messages [2]. It does not state what happens if a client does not follow this part of the protocol. One tradeoff for this implementation is that when the broker initiates closing a producer, there is a chance that the whole connection will get closed if the producer has messages in flight. I think this is a reasonable tradeoff to ensure that clients not following the protocol are not able to persist messages out-of-order. >From my perspective, this is the simplest solution that will ensure message order is preserved. Alternatively, we could come up with logic to try to handle messages sent to "recently" closed producers, but that would greatly increase the complexity for this edge case. Note that it is not sufficient to reply to each message with a SendError because the producer may have already sent later messages and those could be persisted if the producer is concurrently being created. Note also that when the Java Client producer receives a generic SendError, it reacts by closing the connection in most cases. I include more detail in each of the PRs. I look forward to your feedback. Thanks, Michael [0] - https://github.com/apache/pulsar/pull/12779 [1] - https://github.com/apache/pulsar/pull/12780 [2] - https://pulsar.apache.org/docs/en/develop-binary-protocol/#producer