Chickenzilla commented on a change in pull request #23: URL: https://github.com/apache/pulsar-dotpulsar/pull/23#discussion_r449695256
########## File path: src/DotPulsar/Internal/ProducerChannel.cs ########## @@ -90,18 +90,16 @@ public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence< if (autoAssignSequenceId) { - sendPackage.Command.SequenceId = _sequenceId.Current; - sendPackage.Metadata.SequenceId = _sequenceId.Current; + var newSequenceId = _sequenceId.FetchNext(); + sendPackage.Command.SequenceId = newSequenceId; + sendPackage.Metadata.SequenceId = newSequenceId; } else sendPackage.Command.SequenceId = sendPackage.Metadata.SequenceId; var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false); response.Expect(BaseCommand.Type.SendReceipt); - if (autoAssignSequenceId) - _sequenceId.Increment(); Review comment: Having read a bit more of the Java producer, I believe I understand the disconnect here: when we retry messages, they come from the Executor that sits above this code, and so look to the channel like entirely new messages. If more than one was outstanding, ordering (and deduplication) is no longer guaranteed because the retransmits were all submitted asynchronously. When the Java client resends messages, it has a list of outstanding messages (with their original SequenceIds!) ready to retransmit. I'd also like to point out that my concerns are performance related and not merely threading related. It's easy to dismiss multithreaded production as not able to care about ordering, but allow me to explain a toy use-case that definitely would. Let's say I want to send a set of telemetry over Pulsar. This way I can have multiple consumers seeing an accurate, fast, real-time view of my telemetry. This data is sampled perhaps 100 times per second, and there are many fields, so sending the entire telemetry for each update is impossible for my device's network connection. Therefore, I split the updates into two types: full and differential. Full updates are easy enough: they go to a separate (compacted?) topic, and every update is timestamped so a new consumer knows what's latest and where to start applying updates from. Differential updates are interesting because they come in very fast from a single source, are small, and need to be maintained in the order they were produced. My device has enough bandwidth to send these smaller updates, but its latency to Pulsar is perhaps not the best (on the order of 100ms RTT). In the current paradigm, I send one diff to pulsar, and then I twiddle my thumbs for 200ms until I get the acknowledgement back from the broker. In this time I've just completely missed 19 updates, and even if I collapse missed diffs locally, my consumers now only get updates 5 times a second instead of 100. In the correctly working design, I can continue to send updates while awaiting the ACKs from the broker. If I lose my connection, it should be able to resend those messages, with their original ordering, to the broker (as long as I can buffer them in memory at least). I have strict ordering because at no point did I race to make messages concurrently, they all happen exactly 10ms apart, one at a time. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org