Hi, Chen, The at-least-once semantics is always guaranteed by committing the offsets at the last step in the commit. Hence, flushing to local disk, changelog and output topics always need to succeed before the offsets are committed to checkpoint. If anything fails in-between, the offset will not be committed and the replay will happen, as per definition of at-least-once semantics.
Best, -Yi On Tue, Aug 25, 2015 at 9:04 AM, Chen Song <chen.song...@gmail.com> wrote: > Thanks Yi. > > Another follow up question. If 'checkpoint' and 'changelog' is managed > separately (according to your explanation above), how do we support at > least once processing? > > For example, what if the task commits offsets to checkpoint topic but the > producer doesn't send all data in its buffer to the changelog topic and the > task crashed? > > Chen > > On Tue, Aug 4, 2015 at 12:54 PM, Yi Pan <nickpa...@gmail.com> wrote: > > > Hi, Chen, > > > > So, is your goal to improve the throughput to the changelog topic or > reduce > > the size of the changelog topic? If you are targeting for later and your > > KV-store truly is of the size of the input log, I don't see how it is > > possible. In a lot of use cases, users will only need to retain the > > *recent* certain time period of input log. In that case, you can choose > to > > periodically purge the expired records in KV-store to reduce the size > (both > > for the KV-store and the changelog). > > > > Regards, > > -Yi > > > > On Tue, Aug 4, 2015 at 7:25 AM, Chen Song <chen.song...@gmail.com> > wrote: > > > > > Thanks Yan. > > > > > > Very good explanation on 1). > > > > > > For 2), I understand that users can tune the size of the batch for > Kafka > > > producer. However, that doesn't change the number of messages sent to > the > > > changelog topic. In our case, we process a high volume log (1.5MM > > > records/second) will update kv store for each message and this will > > result > > > the changelog to grow to the same size of input log. Even with > compaction > > > turned on changelog, it is not very scalable. I am wondering if there > is > > a > > > way to mitigate this problem. > > > > > > > > > On Wed, Jul 22, 2015 at 2:12 PM, Yan Fang <yanfang...@gmail.com> > wrote: > > > > > > > Hi Chen Song, > > > > > > > > There are two different concepts: *checkpoint* and *changelog*. > > > Checkpoint > > > > is for the offset of the messages, while the changelog is for the > > > kv-store. > > > > The code snippet you show is for the checkpoint , not for the > > changelog. > > > > > > > > {quote} > > > > 1. When implementing our Samza task, does each call of process method > > > > triggers a call to TaskInstance.commit? > > > > {quote} > > > > > > > > TaskInstance.commit triggers the *checkpoint* . It is triggered every > > > > task.commit.ms , (default is 60000ms). The code is here > > > > < > > > > > > > > > > https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala#L149-166 > > > > > > > > > . Basically, the RunLoop class calls the commit method, but only > > trigger > > > > the commit behavior every configured time. > > > > > > > > If you are talking about the *changelog*, it's not controlled by the > > > commit > > > > method. Instead, every put/delete calls the "send > > > > < > > > > > > > > > > https://github.com/apache/samza/blob/master/samza-api/src/main/java/org/apache/samza/system/SystemProducer.java#L51 > > > > >" > > > > of the system Producer. (code is here > > > > < > > > > > > > > > > https://github.com/apache/samza/blob/master/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala#L62-L66 > > > > >). > > > > In terms of how often the "send" really *send *to the broker (e.g. > > > kafka), > > > > it depends on your producer's configuration. For example, in Kafka, > you > > > can > > > > have the producer send a batch (setting async), or send one msg a > time > > > > (setting sync). What it means is that, it leaves the System to decide > > how > > > > to deal with the "send" method. > > > > > > > > > > > > {quote} > > > > 2. Is there a way to buffer these commit activities in memory and > flush > > > > periodically? Our job is joining >1mm messages per second using a KV > > > store > > > > and we have a lot of concern for the changelog size, as in the worst > > > case, > > > > the change log will grow as fast as the input log. > > > > {quote} > > > > > > > > If you are talking about the checkpoint, you can change the > > > task.commit.ms > > > > . > > > > > > > > If you are thinking of the changelog (kv-store), you can change the > > > > producer's config to batch a few changes and send to the broker. > > > > > > > > I think the guys in the community with more operational experience > are > > > able > > > > to tell you what is the best practice. > > > > > > > > Thanks, > > > > > > > > Fang, Yan > > > > yanfang...@gmail.com > > > > > > > > On Wed, Jul 22, 2015 at 9:00 AM, Chen Song <chen.song...@gmail.com> > > > wrote: > > > > > > > > > We are trying to understand the order of commits when processing > each > > > > > message in a Samza job. > > > > > > > > > > T1: input offset commit > > > > > T2: changelog commit > > > > > T3: output commit > > > > > > > > > > By looking at the code snippet in > > > > > > > > > > > > > > > > > > > > https://github.com/apache/samza/blob/master/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L155-L171 > > > > > , > > > > > my understanding is that for each input message, Samza always send > > > update > > > > > message on changelog, send the output message and then commit the > > input > > > > > offset. It makes sense to me at the high level in terms of at least > > > once > > > > > processing. > > > > > > > > > > Specifically, we have two dumb questions: > > > > > > > > > > 1. When implementing our Samza task, does each call of process > method > > > > > triggers a call to TaskInstance.commit? > > > > > 2. Is there a way to buffer these commit activities in memory and > > flush > > > > > periodically? Our job is joining >1mm messages per second using a > KV > > > > store > > > > > and we have a lot of concern for the changelog size, as in the > worst > > > > case, > > > > > the change log will grow as fast as the input log. > > > > > > > > > > Chen > > > > > > > > > > -- > > > > > Chen Song > > > > > > > > > > > > > > > > > > > > > -- > > > Chen Song > > > > > > > > > -- > Chen Song >