> we use "synchronous" guarantee to ensure that the messages we emit from samza are delivered to Kafka linearly.
Samza already guarantees that for you. Currently, two *send()* calls that produce to the same topic partition are *always* delivered in-order. (regardless of whether you called commit, batched up or otherwise) Do you want ordering multiple send calls across partitions? (We have not had a scenario for that yet). Thanks, Jagadish On Thu, Mar 9, 2017 at 10:19 AM, Gaurav Agarwal <gauravagarw...@gmail.com> wrote: > There is one more case to consider - what if a single process call sends > multiple messages? In this case would we need to call checkpoint after > every send() call inside same process() call.. that seems to be > problematic, as once checkpointed, there is no safety net against any > failures in subsequent sends() from same process call. > > Thanks for being patient with my questions! > > > ________________________________ > From: Gaurav Agarwal <gauravagarw...@gmail.com> > Sent: Thursday, March 9, 2017 11:27:17 PM > To: dev@samza.apache.org > Cc: Mukul Gupta; Kshitij Gupta > Subject: Re: Samza 0.12.0 + synchronous KafkaProducer ? > > Hi Jagadish, please find reply inline: > > (it appears that there is no easy way today to guarantee ordered delivery > of messages to Kafka from Samza without consuming the checkpointing > flexibility). > > On Thu, Mar 9, 2017 at 11:01 PM, Jagadish Venkatraman < > jagadish1...@gmail.com<mailto:jagadish1...@gmail.com>> wrote: > Hi Gaurav, > > >> process->process->....->doWork()->checkpoint->process.. > > What does *doWork()* do? Does it actually iterate over accumulated > in-memory state, and send messages to Kafka? > >> In process calls, some "work" gets accumulated in memory (think of them > as strings representing some work). Frequently, many process() calls > produce identical work - so accumulating them in a set has effect of > "deduping" those. In window call, we really process the accumulated work > and call Samza's checkpoint explicitly after that. > > *>> I found the configuration 'batch.size' which says that ''a batch size > of zero will disable batching entirely". We can probably use this property > to force Kafka client to send every message inline.* > > Yes, but I'm not entirely sure it will help your use-case. Just because > "batch.size" is zero, does not necessarily mean the "send" is synchronous. > For instance, You can totally have an async-send with a zero batch size. > It's still possible (and likely) that the message did not make it to the > broker when the send returns. > >> Yes you are correct. We would require stronger sync guarantees if we > need to ensure ordered delivery of message > > *>> This exception is only notified back to task on next invocation of > send(). This is a bit puzzling (a) The exception being thrown is for send > of a message which did not do anything wrong.* > > Thanks for the diligent walkthrough. However, that's perfectly fine as we > will still preserve Kafka guarantees of atleast once in-order delivery. > > *>> what if a checkpoint was called after calling send() of the > message that caused exception? Will we lose processing of that message? > Wouldn't it be too late by the time the exception is thrown back to the > client?* > > Great observation! We will never lose processing of that message. The call > to commit will wait until pending futures or until there's an exception. In > your specific case, the commit call will fail. On a re-start you will > replay from the previous check-pointed offset. (as of the last successful > commit) > >> Thanks for clarifying this. > > Users have reported these exact issues when running Samza at scale and > fixed them. Please refer SAMZA-1069, SAMZA-960. > > *>> The point I am driving at is that should the send() method > in MessageCollector interface provide an optional mechanism to operate it > in **strictly synchronous mode?* > > Even if you did, would n't you still need to tie it with the offset > commits, changelog commits, store-buffer flushes to disk etc. to achieve > your guarantees across restarts? task.commit does that. > >> I think synchronous sends and checkpointing/commits are not entirely > connected - they are two distinct aspects of the system. In the application > use case that I've described, we use "synchronous" guarantee to ensure that > the messages we emit from samza are delivered to Kafka linearly. Whereas > the checkpoint concept is being used to "batch" up some amount of work and > then increment the offset once that batch is processed (which I believe is > the original intent of checkpointing). > > If we used up "checkpointing" for guaranteeing ordered delivery, we would > loose the capability of batching. > > Do let us know if you have more follow-ups. > > Thanks, > Jagadish > > On Wed, Mar 8, 2017 at 10:16 PM, Gaurav Agarwal <gauravagarw...@gmail.com< > mailto:gauravagarw...@gmail.com>> > wrote: > > > Hi Jagadish, > > > > Thank you for very quick and detailed response. > > > > We have already set the task.commit.ms<http://task.commit.ms> = -1 and > are using the > > checkpointing > > mechanism to accumulate some work in memory in order to do it more > > efficiently in batches. So the flow is > > process->process->....->doWork()->checkpoint->process.. > > Doing checkpointing after every process call will defeat the above > strategy > > that we have been following in our application. > > > > However, looking through Kafka docs, I found the configuration > 'batch.size' > > which says that ''a batch size of zero will disable batching entirely". > We > > can probably use this property to force Kafka client to send every > message > > inline. > > Does that sound reasonable to you? > > > > There is another related question here (and please excuse me if this is > > stupid one!) - > > > > If there is any exception in Samza's KafkaSystemProducer.send() method, > > that exception is stored in the SourceData object. This exception is only > > notified back to task on next invocation of send(). > > This is a bit puzzling (a) The exception being thrown is for send of a > > message which did not do anything wrong - the previous guy broke it! and > > (b) what if a checkpoint was called after calling send() of the message > > that caused exception? Will we lose processing of that message? Wouldn't > it > > be too late by the time the exception is thrown back to the client? > > > > The point I am driving at is that should the send() method in > > MessageCollector interface provide an optional mechanism to operate it in > > strictly synchronous mode? > > > > On Thu, Mar 9, 2017 at 11:25 AM, Jagadish Venkatraman < > > jagadish1...@gmail.com<mailto:jagadish1...@gmail.com>> wrote: > > > > > Gaurav, > > > > > > I really appreciate your diligent walkthrough of the code base. Please > > find > > > my replies inline. > > > > > > *>> I am trying to figure out, how to make our Samza task processing > > > strictly ordered * > > > > > > By default, Samza offers you guaranteed in-order atleast-once > processing > > > out-of the box (same semantics as Kafka). To ensure that each send is > > > "acknowledged" by the broker, you can choose to invoke Samza's *commit* > > at > > > the end of processing every message. > > > > > > *>> We do not want to start processing of next message till it is > > > guaranteed that our previously emitted messages from samza tasks have > > been > > > accepted by Kafka broker. Is there any samza configuration that will > make > > > this happen? * > > > > > > You can do the following: > > > A. Set task.commit.ms<http://task.commit.ms> = -1 (This will disable > auto-commit, and allow you > > > to > > > call manual commit). > > > B. At the end of every *process *or *window* call, you can invoke > > > *taskCoordinator.commit(RequestScope.CURRENT_TASK);* > > > > > > > > > *>>The `MessageCollector` interface does not expose a 'flush()' method > > that > > > we could have called after doing a send() to ensure the delivery of > > message > > > to Kafka Broker.* > > > > > > This is intentional(to provide an single commit/flush API via the > > > *taskCoordinator > > > *abstraction). Invoking *taskCoordinator.commit* will wait on pending > > > futures, flush buffers, flush state stores and checkpoint offsets. > > > > > > Please let us know if we can be of more help! > > > > > > Thanks, > > > Jagadish > > > > > > > > > > > > > > > > > > On Wed, Mar 8, 2017 at 9:12 PM, Gaurav Agarwal < > gauravagarw...@gmail.com<mailto:gauravagarw...@gmail.com> > > > > > > wrote: > > > > > > > (correcting recipient address) > > > > > > > > On Thu, Mar 9, 2017 at 10:39 AM, Gaurav Agarwal < > > > gauravagarw...@gmail.com<mailto:gauravagarw...@gmail.com>> > > > > wrote: > > > > > > > > > Hi All, > > > > > > > > > > We are trying to upgrade to Kafka 0.12.0. In the process we noticed > > > that > > > > > the Kafka 0.10.0 KafkaProducer client api does not provide any > > > > > configuration to send() the messages synchronously. One needs to > wait > > > on > > > > > the returned Future for synchronous guarantees. > > > > > > > > > > I am trying to figure out, how to make our Samza task processing > > > strictly > > > > > ordered - i.e. we want to process an incoming message and > optionally > > > > write > > > > > back some messages to kafka. We do not want to start processing of > > next > > > > > message till it is guaranteed that our previously emitted messages > > from > > > > > samza tasks have been accepted by Kafka broker. > > > > > > > > > > Is there any samza configuration that will make this happen? The ` > > > > > MessageCollector` interface does not expose a 'flush()' method that > > we > > > > > could have called after doing a send() to ensure the delivery of > > > message > > > > > to Kafka Broker. (note that `TaskInstanceCollector` - specific > > > > > implementation of `MessageCollector` interface does provide the > > > required > > > > > flush() method) > > > > > > > > > > -- > > > > > cheers, > > > > > gaurav > > > > > > > > > > > > > > > > > > > > > -- > > > Jagadish V, > > > Graduate Student, > > > Department of Computer Science, > > > Stanford University > > > > > > > > > -- > Jagadish V, > Graduate Student, > Department of Computer Science, > Stanford University > > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University