Chickenzilla commented on a change in pull request #23:
URL: https://github.com/apache/pulsar-dotpulsar/pull/23#discussion_r449695256



##########
File path: src/DotPulsar/Internal/ProducerChannel.cs
##########
@@ -90,18 +90,16 @@ public Task<CommandSendReceipt> Send(MessageMetadata 
metadata, ReadOnlySequence<
 
                 if (autoAssignSequenceId)
                 {
-                    sendPackage.Command.SequenceId = _sequenceId.Current;
-                    sendPackage.Metadata.SequenceId = _sequenceId.Current;
+                    var newSequenceId = _sequenceId.FetchNext();
+                    sendPackage.Command.SequenceId = newSequenceId;
+                    sendPackage.Metadata.SequenceId = newSequenceId;
                 }
                 else
                     sendPackage.Command.SequenceId = 
sendPackage.Metadata.SequenceId;
 
                 var response = await _connection.Send(sendPackage, 
cancellationToken).ConfigureAwait(false);
                 response.Expect(BaseCommand.Type.SendReceipt);
 
-                if (autoAssignSequenceId)
-                    _sequenceId.Increment();

Review comment:
       Having read a bit more of the Java producer, I believe I understand the 
disconnect here: when we retry messages, they come from the Executor that sits 
above this code, and so look to the channel like entirely new messages. If more 
than one was outstanding, ordering (and deduplication) is no longer guaranteed 
because the retransmits were all submitted asynchronously.  When the Java 
client resends messages, it has a list of outstanding messages (with their 
original SequenceIds!) ready to retransmit.
   
   I'd also like to point out that my concerns are performance related and not 
merely threading related.  It's easy to dismiss multithreaded production as not 
able to care about ordering, but allow me to explain a toy use-case that 
definitely would.
   
   Let's say I want to send a set of telemetry over Pulsar.  This way I can 
have multiple consumers seeing an accurate, fast, real-time view of my 
telemetry.  This data is sampled perhaps 100 times per second, and there are 
many fields, so sending the entire telemetry for each update is impossible for 
my device's network connection.  Therefore, I split the updates into two types: 
full and differential.  Full updates are easy enough: they go to a separate 
(compacted?) topic, and every update is timestamped so a new consumer knows 
what's latest and where to start applying updates from.
   
   Differential updates are interesting because they come in very fast from a 
single source, are small, and need to be maintained in the order they were 
produced.  My device has enough bandwidth to send these smaller updates, but 
its latency to Pulsar is perhaps not the best (on the order of 100ms RTT).  In 
the current paradigm, I send one diff to pulsar, and then I twiddle my thumbs 
for 200ms until I get the acknowledgement back from the broker.  In this time 
I've just completely missed 19 updates, and even if I collapse missed diffs 
locally, my consumers now only get updates 5 times a second instead of 100.
   
   In the correctly working design, I can continue to send updates while 
awaiting the ACKs from the broker.  If I lose my connection, it should be able 
to resend those messages, with their original ordering, to the broker (as long 
as I can buffer them in memory at least).  I have strict ordering because at no 
point did I race to make messages concurrently, they all happen exactly 10ms 
apart, one at a time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to