Thanks Yan.

Very good explanation on 1).

For 2), I understand that users can tune the size of the batch for Kafka
producer. However, that doesn't change the number of messages sent to the
changelog topic. In our case, we process a high volume log  (1.5MM
records/second) will update kv store for each message and this will result
the changelog to grow to the same size of input log. Even with compaction
turned on changelog, it is not very scalable. I am wondering if there is a
way to mitigate this problem.


On Wed, Jul 22, 2015 at 2:12 PM, Yan Fang <yanfang...@gmail.com> wrote:

> Hi Chen Song,
>
> There are two different concepts: *checkpoint* and *changelog*. Checkpoint
> is for the offset of the messages, while the changelog is for the kv-store.
> The code snippet you show is for the checkpoint , not for the changelog.
>
> {quote}
> 1. When implementing our Samza task, does each call of process method
> triggers a call to TaskInstance.commit?
> {quote}
>
> TaskInstance.commit triggers the *checkpoint* . It is triggered every
> task.commit.ms , (default is 60000ms). The code is here
> <
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166
> >
> . Basically, the RunLoop class calls the commit method, but only trigger
> the commit behavior every configured time.
>
> If you are talking about the *changelog*, it's not controlled by the commit
> method. Instead, every put/delete calls the "send
> <
> https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51
> >"
> of the system Producer. (code is here
> <
> https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66
> >).
> In terms of how often the "send" really *send *to the broker (e.g. kafka),
> it depends on your producer's configuration. For example, in Kafka, you can
> have the producer send a batch (setting async), or send one msg a time
> (setting sync). What it means is that, it leaves the System to decide how
> to deal with the "send" method.
>
>
> {quote}
> 2. Is there a way to buffer these commit activities in memory and flush
> periodically? Our job is joining >1mm messages per second using a KV store
> and we have a lot of concern for the changelog size, as in the worst case,
> the change log will grow as fast as the input log.
> {quote}
>
> If you are talking about the checkpoint, you can change the task.commit.ms
> .
>
> If you are thinking of the changelog (kv-store), you can change the
> producer's config to batch a few changes and send to the broker.
>
> I think the guys in the community with more operational experience are able
> to tell you what is the best practice.
>
> Thanks,
>
> Fang, Yan
> yanfang...@gmail.com
>
> On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <chen.song...@gmail.com> wrote:
>
> > We are trying to understand the order of commits when processing each
> > message in a Samza job.
> >
> > T1: input offset commit
> > T2: changelog commit
> > T3: output commit
> >
> > By looking at the code snippet in
> >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
> > ,
> > my understanding is that for each input message, Samza always send update
> > message on changelog, send the output message and then commit the input
> > offset. It makes sense to me at the high level in terms of at least once
> > processing.
> >
> > Specifically, we have two dumb questions:
> >
> > 1. When implementing our Samza task, does each call of process method
> > triggers a call to TaskInstance.commit?
> > 2. Is there a way to buffer these commit activities in memory and flush
> > periodically? Our job is joining >1mm messages per second using a KV
> store
> > and we have a lot of concern for the changelog size, as in the worst
> case,
> > the change log will grow as fast as the input log.
> >
> > Chen
> >
> > --
> > Chen Song
> >
>



-- 
Chen Song

Reply via email to