Hi Gaurav, >> process->process->....->doWork()->checkpoint->process..
What does *doWork()* do? Does it actually iterate over accumulated in-memory state, and send messages to Kafka? *>> I found the configuration 'batch.size' which says that ''a batch size of zero will disable batching entirely". We can probably use this property to force Kafka client to send every message inline.* Yes, but I'm not entirely sure it will help your use-case. Just because "batch.size" is zero, does not necessarily mean the "send" is synchronous. For instance, You can totally have an async-send with a zero batch size. It's still possible (and likely) that the message did not make it to the broker when the send returns. *>> This exception is only notified back to task on next invocation of send(). This is a bit puzzling (a) The exception being thrown is for send of a message which did not do anything wrong.* Thanks for the diligent walkthrough. However, that's perfectly fine as we will still preserve Kafka guarantees of atleast once in-order delivery. *>> what if a checkpoint was called after calling send() of the message that caused exception? Will we lose processing of that message? Wouldn't it be too late by the time the exception is thrown back to the client?* Great observation! We will never lose processing of that message. The call to commit will wait until pending futures or until there's an exception. In your specific case, the commit call will fail. On a re-start you will replay from the previous check-pointed offset. (as of the last successful commit) Users have reported these exact issues when running Samza at scale and fixed them. Please refer SAMZA-1069, SAMZA-960. *>> The point I am driving at is that should the send() method in MessageCollector interface provide an optional mechanism to operate it in **strictly synchronous mode?* Even if you did, would n't you still need to tie it with the offset commits, changelog commits, store-buffer flushes to disk etc. to achieve your guarantees across restarts? task.commit does that. Do let us know if you have more follow-ups. Thanks, Jagadish On Wed, Mar 8, 2017 at 10:16 PM, Gaurav Agarwal <gauravagarw...@gmail.com> wrote: > Hi Jagadish, > > Thank you for very quick and detailed response. > > We have already set the task.commit.ms = -1 and are using the > checkpointing > mechanism to accumulate some work in memory in order to do it more > efficiently in batches. So the flow is > process->process->....->doWork()->checkpoint->process.. > Doing checkpointing after every process call will defeat the above strategy > that we have been following in our application. > > However, looking through Kafka docs, I found the configuration 'batch.size' > which says that ''a batch size of zero will disable batching entirely". We > can probably use this property to force Kafka client to send every message > inline. > Does that sound reasonable to you? > > There is another related question here (and please excuse me if this is > stupid one!) - > > If there is any exception in Samza's KafkaSystemProducer.send() method, > that exception is stored in the SourceData object. This exception is only > notified back to task on next invocation of send(). > This is a bit puzzling (a) The exception being thrown is for send of a > message which did not do anything wrong - the previous guy broke it! and > (b) what if a checkpoint was called after calling send() of the message > that caused exception? Will we lose processing of that message? Wouldn't it > be too late by the time the exception is thrown back to the client? > > The point I am driving at is that should the send() method in > MessageCollector interface provide an optional mechanism to operate it in > strictly synchronous mode? > > On Thu, Mar 9, 2017 at 11:25 AM, Jagadish Venkatraman < > jagadish1...@gmail.com> wrote: > > > Gaurav, > > > > I really appreciate your diligent walkthrough of the code base. Please > find > > my replies inline. > > > > *>> I am trying to figure out, how to make our Samza task processing > > strictly ordered * > > > > By default, Samza offers you guaranteed in-order atleast-once processing > > out-of the box (same semantics as Kafka). To ensure that each send is > > "acknowledged" by the broker, you can choose to invoke Samza's *commit* > at > > the end of processing every message. > > > > *>> We do not want to start processing of next message till it is > > guaranteed that our previously emitted messages from samza tasks have > been > > accepted by Kafka broker. Is there any samza configuration that will make > > this happen? * > > > > You can do the following: > > A. Set task.commit.ms = -1 (This will disable auto-commit, and allow you > > to > > call manual commit). > > B. At the end of every *process *or *window* call, you can invoke > > *taskCoordinator.commit(RequestScope.CURRENT_TASK);* > > > > > > *>>The `MessageCollector` interface does not expose a 'flush()' method > that > > we could have called after doing a send() to ensure the delivery of > message > > to Kafka Broker.* > > > > This is intentional(to provide an single commit/flush API via the > > *taskCoordinator > > *abstraction). Invoking *taskCoordinator.commit* will wait on pending > > futures, flush buffers, flush state stores and checkpoint offsets. > > > > Please let us know if we can be of more help! > > > > Thanks, > > Jagadish > > > > > > > > > > > > On Wed, Mar 8, 2017 at 9:12 PM, Gaurav Agarwal <gauravagarw...@gmail.com > > > > wrote: > > > > > (correcting recipient address) > > > > > > On Thu, Mar 9, 2017 at 10:39 AM, Gaurav Agarwal < > > gauravagarw...@gmail.com> > > > wrote: > > > > > > > Hi All, > > > > > > > > We are trying to upgrade to Kafka 0.12.0. In the process we noticed > > that > > > > the Kafka 0.10.0 KafkaProducer client api does not provide any > > > > configuration to send() the messages synchronously. One needs to wait > > on > > > > the returned Future for synchronous guarantees. > > > > > > > > I am trying to figure out, how to make our Samza task processing > > strictly > > > > ordered - i.e. we want to process an incoming message and optionally > > > write > > > > back some messages to kafka. We do not want to start processing of > next > > > > message till it is guaranteed that our previously emitted messages > from > > > > samza tasks have been accepted by Kafka broker. > > > > > > > > Is there any samza configuration that will make this happen? The ` > > > > MessageCollector` interface does not expose a 'flush()' method that > we > > > > could have called after doing a send() to ensure the delivery of > > message > > > > to Kafka Broker. (note that `TaskInstanceCollector` - specific > > > > implementation of `MessageCollector` interface does provide the > > required > > > > flush() method) > > > > > > > > -- > > > > cheers, > > > > gaurav > > > > > > > > > > > > > > > -- > > Jagadish V, > > Graduate Student, > > Department of Computer Science, > > Stanford University > > > -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University