Thanks Yi.

Another follow up question. If 'checkpoint' and 'changelog' is managed
separately (according to your explanation above), how do we support at
least once processing?

For example, what if the task commits offsets to checkpoint topic but the
producer doesn't send all data in its buffer to the changelog topic and the
task crashed?

Chen

On Tue, Aug 4, 2015 at 12:54 PM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Chen,
>
> So, is your goal to improve the throughput to the changelog topic or reduce
> the size of the changelog topic? If you are targeting for later and your
> KV-store truly is of the size of the input log, I don't see how it is
> possible. In a lot of use cases, users will only need to retain the
> *recent* certain time period of input log. In that case, you can choose to
> periodically purge the expired records in KV-store to reduce the size (both
> for the KV-store and the changelog).
>
> Regards,
> -Yi
>
> On Tue, Aug 4, 2015 at 7:25 AM, Chen Song <chen.song...@gmail.com> wrote:
>
> > Thanks Yan.
> >
> > Very good explanation on 1).
> >
> > For 2), I understand that users can tune the size of the batch for Kafka
> > producer. However, that doesn't change the number of messages sent to the
> > changelog topic. In our case, we process a high volume log  (1.5MM
> > records/second) will update kv store for each message and this will
> result
> > the changelog to grow to the same size of input log. Even with compaction
> > turned on changelog, it is not very scalable. I am wondering if there is
> a
> > way to mitigate this problem.
> >
> >
> > On Wed, Jul 22, 2015 at 2:12 PM, Yan Fang <yanfang...@gmail.com> wrote:
> >
> > > Hi Chen Song,
> > >
> > > There are two different concepts: *checkpoint* and *changelog*.
> > Checkpoint
> > > is for the offset of the messages, while the changelog is for the
> > kv-store.
> > > The code snippet you show is for the checkpoint , not for the
> changelog.
> > >
> > > {quote}
> > > 1. When implementing our Samza task, does each call of process method
> > > triggers a call to TaskInstance.commit?
> > > {quote}
> > >
> > > TaskInstance.commit triggers the *checkpoint* . It is triggered every
> > > task.commit.ms , (default is 60000ms). The code is here
> > > <
> > >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166
> > > >
> > > . Basically, the RunLoop class calls the commit method, but only
> trigger
> > > the commit behavior every configured time.
> > >
> > > If you are talking about the *changelog*, it's not controlled by the
> > commit
> > > method. Instead, every put/delete calls the "send
> > > <
> > >
> >
> https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51
> > > >"
> > > of the system Producer. (code is here
> > > <
> > >
> >
> https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66
> > > >).
> > > In terms of how often the "send" really *send *to the broker (e.g.
> > kafka),
> > > it depends on your producer's configuration. For example, in Kafka, you
> > can
> > > have the producer send a batch (setting async), or send one msg a time
> > > (setting sync). What it means is that, it leaves the System to decide
> how
> > > to deal with the "send" method.
> > >
> > >
> > > {quote}
> > > 2. Is there a way to buffer these commit activities in memory and flush
> > > periodically? Our job is joining >1mm messages per second using a KV
> > store
> > > and we have a lot of concern for the changelog size, as in the worst
> > case,
> > > the change log will grow as fast as the input log.
> > > {quote}
> > >
> > > If you are talking about the checkpoint, you can change the
> > task.commit.ms
> > > .
> > >
> > > If you are thinking of the changelog (kv-store), you can change the
> > > producer's config to batch a few changes and send to the broker.
> > >
> > > I think the guys in the community with more operational experience are
> > able
> > > to tell you what is the best practice.
> > >
> > > Thanks,
> > >
> > > Fang, Yan
> > > yanfang...@gmail.com
> > >
> > > On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <chen.song...@gmail.com>
> > wrote:
> > >
> > > > We are trying to understand the order of commits when processing each
> > > > message in a Samza job.
> > > >
> > > > T1: input offset commit
> > > > T2: changelog commit
> > > > T3: output commit
> > > >
> > > > By looking at the code snippet in
> > > >
> > > >
> > >
> >
> https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171
> > > > ,
> > > > my understanding is that for each input message, Samza always send
> > update
> > > > message on changelog, send the output message and then commit the
> input
> > > > offset. It makes sense to me at the high level in terms of at least
> > once
> > > > processing.
> > > >
> > > > Specifically, we have two dumb questions:
> > > >
> > > > 1. When implementing our Samza task, does each call of process method
> > > > triggers a call to TaskInstance.commit?
> > > > 2. Is there a way to buffer these commit activities in memory and
> flush
> > > > periodically? Our job is joining >1mm messages per second using a KV
> > > store
> > > > and we have a lot of concern for the changelog size, as in the worst
> > > case,
> > > > the change log will grow as fast as the input log.
> > > >
> > > > Chen
> > > >
> > > > --
> > > > Chen Song
> > > >
> > >
> >
> >
> >
> > --
> > Chen Song
> >
>



-- 
Chen Song

Reply via email to