>  we use "synchronous" guarantee to ensure that the messages we emit from
samza are delivered to Kafka linearly.

Samza already guarantees that for you. Currently, two *send()* calls that
produce to the same topic partition are *always* delivered in-order.
(regardless of whether you called commit, batched up or otherwise)

Do you want ordering multiple send calls across partitions? (We have not
had a scenario for that yet).

Thanks,
Jagadish





On Thu, Mar 9, 2017 at 10:19 AM, Gaurav Agarwal <gauravagarw...@gmail.com>
wrote:

> There is one more case to consider - what if a single process call sends
> multiple messages? In this case would we need to call checkpoint after
> every send() call inside same process() call.. that seems to be
> problematic, as once checkpointed, there is no safety net against any
> failures in subsequent sends() from same process call.
>
> Thanks for being patient with my questions!
>
>
> ________________________________
> From: Gaurav Agarwal <gauravagarw...@gmail.com>
> Sent: Thursday, March 9, 2017 11:27:17 PM
> To: dev@samza.apache.org
> Cc: Mukul Gupta; Kshitij Gupta
> Subject: Re: Samza 0.12.0 + synchronous KafkaProducer ?
>
> Hi Jagadish, please find reply inline:
>
> (it appears that there is no easy way today to guarantee ordered delivery
> of messages to Kafka from Samza without consuming the checkpointing
> flexibility).
>
> On Thu, Mar 9, 2017 at 11:01 PM, Jagadish Venkatraman <
> jagadish1...@gmail.com<mailto:jagadish1...@gmail.com>> wrote:
> Hi Gaurav,
>
> >> process->process->....->doWork()->checkpoint->process..
>
> What does *doWork()* do? Does it actually iterate over accumulated
> in-memory state, and send messages to Kafka?
> >> In process calls, some "work" gets accumulated in memory (think of them
> as strings representing some work). Frequently, many process() calls
> produce identical work - so accumulating them in a set has effect of
> "deduping" those. In window call, we really process the accumulated work
> and call Samza's checkpoint explicitly after that.
>
> *>>  I found the configuration 'batch.size' which says that ''a batch size
> of zero will disable batching entirely". We can probably use this property
> to force Kafka client to send every message inline.*
>
> Yes, but I'm not entirely sure it will help your use-case. Just because
> "batch.size" is zero, does not necessarily mean the "send" is synchronous.
> For instance, You can totally have an async-send with a zero batch size.
> It's still possible (and likely) that the message did not make it to the
> broker when the send returns.
> >> Yes you are correct. We would require stronger sync guarantees if we
> need to ensure ordered delivery of message
>
> *>> This exception is only notified back to task on next invocation of
> send(). This is a bit puzzling (a) The exception being thrown is for send
> of a message which did not do anything wrong.*
>
> Thanks for the diligent walkthrough. However, that's perfectly fine as we
> will still preserve Kafka guarantees of atleast once in-order delivery.
>
> *>> what if a checkpoint was called after calling send() of the
> message that caused exception? Will we lose processing of that message?
> Wouldn't it be too late by the time the exception is thrown back to the
> client?*
>
> Great observation! We will never lose processing of that message. The call
> to commit will wait until pending futures or until there's an exception. In
> your specific case, the commit call will fail. On a re-start you will
> replay from the previous check-pointed offset. (as of the last successful
> commit)
> >> Thanks for clarifying this.
>
> Users have reported these exact issues when running Samza at scale and
> fixed them. Please refer SAMZA-1069, SAMZA-960.
>
> *>> The point I am driving at is that should the send() method
> in MessageCollector interface provide an optional mechanism to operate it
> in **strictly synchronous mode?*
>
> Even if you did, would n't you still need to tie it with the offset
> commits, changelog commits, store-buffer flushes to disk etc. to achieve
> your guarantees across restarts? task.commit does that.
> >> I think synchronous sends and checkpointing/commits are not entirely
> connected - they are two distinct aspects of the system. In the application
> use case that I've described, we use "synchronous" guarantee to ensure that
> the messages we emit from samza are delivered to Kafka linearly. Whereas
> the checkpoint concept is being used to "batch" up some amount of work and
> then increment the offset once that batch is processed (which I believe is
> the original intent of checkpointing).
>
> If we used up "checkpointing" for guaranteeing ordered delivery, we would
> loose the capability of batching.
>
> Do let us know if you have more follow-ups.
>
> Thanks,
> Jagadish
>
> On Wed, Mar 8, 2017 at 10:16 PM, Gaurav Agarwal <gauravagarw...@gmail.com<
> mailto:gauravagarw...@gmail.com>>
> wrote:
>
> > Hi Jagadish,
> >
> > Thank you for very quick and detailed response.
> >
> > We have already set the task.commit.ms<http://task.commit.ms> = -1 and
> are using the
> > checkpointing
> > mechanism to accumulate some work in memory in order to do it more
> > efficiently in batches. So the flow is
> > process->process->....->doWork()->checkpoint->process..
> > Doing checkpointing after every process call will defeat the above
> strategy
> > that we have been following in our application.
> >
> > However, looking through Kafka docs, I found the configuration
> 'batch.size'
> > which says that ''a batch size of zero will disable batching entirely".
> We
> > can probably use this property to force Kafka client to send every
> message
> > inline.
> > Does that sound reasonable to you?
> >
> > There is another related question here (and please excuse me if this is
> > stupid one!) -
> >
> > If there is any exception in Samza's KafkaSystemProducer.send() method,
> > that exception is stored in the SourceData object. This exception is only
> > notified back to task on next invocation of send().
> > This is a bit puzzling (a) The exception being thrown is for send of a
> > message which did not do anything wrong - the previous guy broke it! and
> > (b) what if a checkpoint was called after calling send() of the message
> > that caused exception? Will we lose processing of that message? Wouldn't
> it
> > be too late by the time the exception is thrown back to the client?
> >
> > The point I am driving at is that should the send() method in
> > MessageCollector interface provide an optional mechanism to operate it in
> > strictly synchronous mode?
> >
> > On Thu, Mar 9, 2017 at 11:25 AM, Jagadish Venkatraman <
> > jagadish1...@gmail.com<mailto:jagadish1...@gmail.com>> wrote:
> >
> > > Gaurav,
> > >
> > > I really appreciate your diligent walkthrough of the code base. Please
> > find
> > > my replies inline.
> > >
> > > *>> I am trying to figure out, how to make our Samza task processing
> > > strictly ordered *
> > >
> > > By default, Samza offers you guaranteed in-order atleast-once
> processing
> > > out-of the box (same semantics as Kafka). To ensure that each send is
> > > "acknowledged" by the broker, you can choose to invoke Samza's *commit*
> > at
> > > the end of processing every message.
> > >
> > > *>> We do not want to start processing of next message till it is
> > > guaranteed that our previously emitted messages from samza tasks have
> > been
> > > accepted by Kafka broker. Is there any samza configuration that will
> make
> > > this happen? *
> > >
> > > You can do the following:
> > > A. Set task.commit.ms<http://task.commit.ms> = -1 (This will disable
> auto-commit, and allow you
> > > to
> > > call manual commit).
> > > B. At the end of every *process *or *window* call, you can invoke
> > > *taskCoordinator.commit(RequestScope.CURRENT_TASK);*
> > >
> > >
> > > *>>The `MessageCollector` interface does not expose a 'flush()' method
> > that
> > > we could have called after doing a send() to ensure the delivery of
> > message
> > > to Kafka Broker.*
> > >
> > > This is intentional(to provide an single commit/flush API via the
> > > *taskCoordinator
> > > *abstraction). Invoking *taskCoordinator.commit* will wait on pending
> > > futures, flush buffers, flush state stores and checkpoint offsets.
> > >
> > > Please let us know if we can be of more help!
> > >
> > > Thanks,
> > > Jagadish
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Mar 8, 2017 at 9:12 PM, Gaurav Agarwal <
> gauravagarw...@gmail.com<mailto:gauravagarw...@gmail.com>
> > >
> > > wrote:
> > >
> > > > (correcting recipient address)
> > > >
> > > > On Thu, Mar 9, 2017 at 10:39 AM, Gaurav Agarwal <
> > > gauravagarw...@gmail.com<mailto:gauravagarw...@gmail.com>>
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > We are trying to upgrade to Kafka 0.12.0. In the process we noticed
> > > that
> > > > > the Kafka 0.10.0 KafkaProducer client api does not provide any
> > > > > configuration to send() the messages synchronously. One needs to
> wait
> > > on
> > > > > the returned Future for synchronous guarantees.
> > > > >
> > > > > I am trying to figure out, how to make our Samza task processing
> > > strictly
> > > > > ordered - i.e. we want to process an incoming message and
> optionally
> > > > write
> > > > > back some messages to kafka. We do not want to start processing of
> > next
> > > > > message till it is guaranteed that our previously emitted messages
> > from
> > > > > samza tasks have been accepted by Kafka broker.
> > > > >
> > > > > Is there any samza configuration that will make this happen? The `
> > > > > MessageCollector` interface does not expose a 'flush()' method that
> > we
> > > > > could have called after doing a send() to ensure the delivery of
> > > message
> > > > > to Kafka Broker. (note that `TaskInstanceCollector` -  specific
> > > > > implementation of `MessageCollector` interface does provide the
> > > required
> > > > > flush() method)
> > > > >
> > > > > --
> > > > > cheers,
> > > > > gaurav
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Jagadish V,
> > > Graduate Student,
> > > Department of Computer Science,
> > > Stanford University
> > >
> >
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>
>


-- 
Jagadish V,
Graduate Student,
Department of Computer Science,
Stanford University

Reply via email to