Hi Pulsar Community,

I discovered a race condition in Pulsar’s Java Client ProducerImpl
that can lead to messages persisted out-of-order for a single producer
sending to a non-partitioned topic. I can reproduce this issue, and I
verified the order by adding sequence ids to the message payload
before calling `producer.send`. I opened a PR to fix the race [0] and
another to improve the broker’s behavior [1].

At a high level, the ProducerImpl can get into a corrupt state if it
switches connections too quickly. In this corrupt state, the producer
can send messages before, during, and after the producer is registered
to the broker. Because the broker ignores messages until a producer is
created for the ServerCnx, some of the early messages are ignored and,
once the producer is created, some later ones are persisted.

In PR [1], I propose that when a broker gets an unexpected message
(Send command), it should close the connection to protect against
clients that are not following the protocol instead of simply ignoring
unexpected messages. The protocol already states that clients are to
register producers and then start sending messages [2]. It does not
state what happens if a client does not follow this part of the
protocol.

One tradeoff for this implementation is that when the broker initiates
closing a producer, there is a chance that the whole connection will
get closed if the producer has messages in flight. I think this is a
reasonable tradeoff to ensure that clients not following the protocol
are not able to persist messages out-of-order.

>From my perspective, this is the simplest solution that will ensure
message order is preserved. Alternatively, we could come up with logic
to try to handle messages sent to "recently" closed producers, but
that would greatly increase the complexity for this edge case. Note
that it is not sufficient to reply to each message with a SendError
because the producer may have already sent later messages and those
could be persisted if the producer is concurrently being created. Note
also that when the Java Client producer receives a generic SendError,
it reacts by closing the connection in most cases.

I include more detail in each of the PRs. I look forward to your feedback.

Thanks,
Michael

[0] - https://github.com/apache/pulsar/pull/12779
[1] - https://github.com/apache/pulsar/pull/12780
[2] - https://pulsar.apache.org/docs/en/develop-binary-protocol/#producer

Reply via email to