blankensteiner commented on pull request #67:
URL: https://github.com/apache/pulsar-dotpulsar/pull/67#issuecomment-749551670


   Hi @dionjansen 
   Thanks for the PR!
   I'll try and answer the best I can :-)
   
   > I've added a reference to Microsoft.Bcl.AsyncInterfaces to support the 
IDisposable interface in net5.0. I only had this issue in VSCode for the test 
and sample projects. This does not influence the tests or running samples on 
VSC (on mac).
   
   We have tried adding support for Visual Studio Code before, but sadly it's 
just not a pleasant road to go down. I have no idea why Microsoft has created 
two IDE's for C# and doesn't ensure that they behave the same.
   Visual Studio Code will create nonsense warnings and require unnecessary 
changes to the code-base and therefore it's not supported for developing 
DotPulsar. Visual Studio Community Edition and the commercial offerings and 
Rider are supported, so you have to use one of those.
   
   > I would like to add some more unit tests though this is a bit hard without 
a proper mocking framework, would you be open to introducing something like Moq 
to allow testing classes in a shallow way? Or do you rather have I follow a 
different unit test strategy?
   
   Feel free to add one or more of these (if you need them):
   
   - AutoFixture
   - AutoFixture.AutoNSubstitute
   - AutoFixture.Xunit2
   - NSubstitute
   
   > Right now I create a IMessageAcksTracker instance in the PulsarClient, 
though when clients do not have the consumer configured to use the tracking, a 
dummy InactiveMessageAcksTracker is passed. This instance is passed to the 
ConsumerChannelFactory and used when a channel is created. Does this setup 
makes sense to you?
   
   Yes.
   
   > When the tracker is started it runs indefinitely I find it hard to find a 
good place to start this thread, also since I need the consumer to call 
RedeliverUnacknowledgedMessages. Any ideas on how to improve this pattern? 
Perhaps a static method like some of the StateMonitor methods.
   
   The question is if you actually need to consumer or just the consumer 
channel. The tracking should start and end together with the 
MessageQueue/ConsumerChannel.
   
   > I wonder how batched messages fit into all this, does a batch have a 
single message id? And should I be able to just redeliver that message id to 
release the batch back to the broker?
   
   That's a really good question. Bookkeeper stores batched messages as one, so 
if you have a batched message consisting of 5 messages and you ack 4 of them, 
but the last times out and you ask the broker to redeliver, you will get the 
entire batch again. You could keep track of this, but it will hurt performance. 
I don't know what the other clients do here, but maybe you could test that?
   
   > The MessageAcksTracker uses a polling mechanism to re-check for timed out 
messages (either due to being unacked for too long or the nack delay has been 
exceeded). Is this (generally) what you were thinking about too? Alternatively 
I could think of an approach where polling is done through a Timer. Or we could 
create individual Tasks for each added message to the tracker but I'm concerned 
of the overhead created by this.
   
   I agree that a timer/task per message will hurt performance too much. Have 
one task for the entire Consumer/MessageQueue is the right solution. Waking up 
and looking at what needs to be redelivered. Here we need to find a thread-safe 
and performant way of storing and accessing this information.
   
   > Atm I haven't considered the scenario yet that only a nack delay is 
configured and not an ack timeout in which case we will not have to track all 
dequeued messages.
   
   Just a boolean check on dequeue and ack? We also need to handle cumulative 
acknowledgment.
   
   Consider this ackTimeout implementation. First ackTimeout should be giving 
as a TimeSpan (instead of a an int or long as milli- or micro-seconds).
   
   When a message is dequeued, and if we have an ackTimeout, then we store the 
MessageId and StopWatch.GetTimestamp() in an "AwaitingAck" struct in a 
ConcurrencyQueue, let's call it "AwaitingAcks". Other suggestions for 
concurrent collections with fast insertion are welcome.
   
   When a message is acknowledged, and if we have an ackTimeout, then we store 
the acks instead of removing them from "AwaitingAcks". If we want to remove 
them right away, then we need the "AwaitingAcks" collection to support both 
iteration and random deletion. We only need to store the highest cumulative ack 
we see (if there is one) and the MessageIds not included by that cumulative ack.
   
   When the ackTracker wakes up and has calculated what ackTimeout is in 
StopWatch ticks (those are not the same as TimeSpan ticks). It will call 
StopWatch.GetTimestamp(). We will now TryPeek and Dequeue from "AwaitingAck" 
for as long as the tracker timestamp -  AwaitingAck.Timestamp is larger than 
the calculated timeout.
   If the MessageId is not acknowledged, it's added to a 
CommandRedeliverUnacknowledgedMessages (that we are reusing) and then send it 
(if MessageIds were added).
   If the MessageId was acknowledged, then we can remove that MessageId from 
the AckedMessageIds.
   
   Consider this nackTimeout implementation.
   
   When a message is nacked, we added the MessageId(s) to a  
CommandRedeliverUnacknowledgedMessages that we are reusing (should our 
"RedeliverUnacknowledgedMessages" taking an enumerable of messageId actually 
have been called "NegativeAcknowledge"?).
   
   When the nackTracker wakes up it will check if the 
CommandRedeliverUnacknowledgedMessages has MessageIds and if yes, then send it.
   
   Writing such a detailed implementation description was not what I intended, 
but when I first get going.... :-D
   Anyway, if it is unclear, then I can try and make those classes/structs and 
push them to master.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to