dionjansen commented on pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#issuecomment-753657484


   @blankensteiner I've tried to follow your suggestions and simplify the 
implementation a bit: I focussed just on the unacked message tracker. Perhaps 
it is anyway a good idea to separate these two mechanisms (nack tracking and 
unacked tracking) since they are configured independently of each other.
   
   > Feel free to add one or more of these (if you need them):
   > 
   > * AutoFixture
   > * AutoFixture.AutoNSubstitute
   > * AutoFixture.Xunit2
   > * NSubstitute
   > 
   
   Added `NSubstitute` and `AutoFixture.AutoNSubstitute`. I started testing the 
unacked tracker with this but I'm not sure I'm using the correct pattern to 
verify if messages are being redelivered under different test conditions. Since 
I don't see any other unit tests that tests internal classes in this way I'm 
not sure if this strategy agrees with the rest of the lib, so let me know what 
you think.
   
   > The question is if you actually need to consumer or just the consumer 
channel. The tracking should start and end together with the 
MessageQueue/ConsumerChannel.
   
   I am starting the thread now in the Pulsar client 
https://github.com/apache/pulsar-dotpulsar/blob/26cd957a56b6349f9e37bf76bd37f92a7a7e0970/src/DotPulsar/PulsarClient.cs#L81-L84
   
   The tracker is then passed to the channel factory, so it can be passed to a 
message queue that is passed to the channel when created. The tracker loop is 
stopped when disposed which occurs when the message queue is disposed, which in 
terms occurs when the channel is disposed. I'm still using `IConsumer` for 
start in order to avoid duplicate implementation of 
`RedeliverUnacknowledgedMessages` in the consumer.
   
   > That's a really good question. Bookkeeper stores batched messages as one, 
so if you have a batched message consisting of 5 messages and you ack 4 of 
them, but the last times out and you ask the broker to redeliver, you will get 
the entire batch again. You could keep track of this, but it will hurt 
performance. I don't know what the other clients do here, but maybe you could 
test that?
   
   From what I can see in the [`ConsumerImpl` in 
java](https://github.com/apache/pulsar/blob/6926180966f45eb9c1499b7f0eb32ea2a1368fd6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1437-L1448)
 in case of batch messages only one item is put in the tracker for a batch 
message using `(ledgerId, entryId, partitionIndex)`. Then [when 
acking](https://github.com/apache/pulsar/blob/cc64889abe94d47f048e2f8e8fb10d6c37e695ec/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L602-L616)
 it looks like the batch is again treated as a single message, but only if 
`markAckForBatchMessage` returns false which indicates not all messages in the 
batch have been acked yet. I don't really see from this implementation how  
they *"keep track"* of what to ack within the batch in this way, but there is a 
lot going one here that I can't make sense of. Does the adding removing 
mechanism I point out here to the tracker make any sense to you?
   
   > Consider this ackTimeout implementation. First ackTimeout should be giving 
as a TimeSpan (instead of a an int or long as milli- or micro-seconds).
   
   done, I kept the configuration options of the consumer to milliseconds 
though.
   
   > When a message is dequeued, and if we have an ackTimeout, then we store 
the MessageId and StopWatch.GetTimestamp() in an "AwaitingAck" struct in a 
ConcurrencyQueue, let's call it "AwaitingAcks". Other suggestions for 
concurrent collections with fast insertion are welcome.
   
   I was struggling a bit to create a comparable TimeSpan (since as you point 
out stopwatch ticks != timespan ticks). I followed 
[this](http://geekswithblogs.net/BlackRabbitCoder/archive/2012/01/12/c.net-little-pitfalls-stopwatch-ticks-are-not-timespan-ticks.aspx)
 article and concluded: 
https://github.com/apache/pulsar-dotpulsar/blob/f4725f5d81e8715a55ab3e4ea6791f903e9e9ad4/src/DotPulsar/Internal/UnackedMessageTracker.cs#L24-L25
   Not casting frequency explicitly to double results in considerable loss of 
accuracy.
   
   > When a message is acknowledged, and if we have an ackTimeout, then we 
store the acks instead of removing them from "AwaitingAcks". If we want to 
remove them right away, then we need the "AwaitingAcks" collection to support 
both iteration and random deletion. We only need to store the highest 
cumulative ack we see (if there is one) and the MessageIds not included by that 
cumulative ack.
   
   How can I determine if there is a highest cumulative ack from a `MessageId` 
instance? And if we do this wouldn't we also need some kind of removal from the 
unacked list that removes until this highest value, like: 
[removeMessagesTill](https://github.com/apache/pulsar/blob/6926180966f45eb9c1499b7f0eb32ea2a1368fd6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java#L225)?
   
   > 
   > When the ackTracker wakes up and has calculated what ackTimeout is in 
StopWatch ticks (those are not the same as TimeSpan ticks). It will call 
StopWatch.GetTimestamp(). We will now TryPeek and Dequeue from "AwaitingAck" 
for as long as the tracker timestamp - AwaitingAck.Timestamp is larger than the 
calculated timeout.
   > If the MessageId is not acknowledged, it's added to a 
CommandRedeliverUnacknowledgedMessages (that we are reusing) and then send it 
(if MessageIds were added).
   > If the MessageId was acknowledged, then we can remove that MessageId from 
the AckedMessageIds.
   > 
   
   I tried to capture this in 
https://github.com/apache/pulsar-dotpulsar/blob/51f4a984a5819aeb91ebcd25345b495c87cc02a7/src/DotPulsar/Internal/UnackedMessageTracker.cs#L81-L91.
 The only thing I'm still unsure about is the accumulated acking.
   
   > Consider this nackTimeout implementation.
   > 
   > When a message is nacked, we added the MessageId(s) to a 
CommandRedeliverUnacknowledgedMessages that we are reusing (should our 
"RedeliverUnacknowledgedMessages" taking an enumerable of messageId actually 
have been called "NegativeAcknowledge"?).
   > 
   > When the nackTracker wakes up it will check if the 
CommandRedeliverUnacknowledgedMessages has MessageIds and if yes, then send it.
   
   I will implement the nack tracker if you are happy with the unacked tracker 
as it stands (which might also be refactored into one single tracker if two 
trackers are a performance concern to you).
   
   > 
   > Writing such a detailed implementation description was not what I 
intended, but when I first get going.... :-D
   > Anyway, if it is unclear, then I can try and make those classes/structs 
and push them to master.
   
   Let me know what you think, this is still a bit of a learning process for me 
both on the internals of this lib as well as C# / Pulsar details. Thanks in 
advance 👍 .
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to