Chris, Thank you for this really great response. I don't need this right now but wanted to understand the limitations. If Kafka could guarantee to keep the last N hours dirty, that would provide foundation to build on.
Thanks, Roger On Fri, Feb 20, 2015 at 8:16 AM, Chris Riccomini <criccom...@apache.org> wrote: > Hey Roger, > > I believe your description is correct. Kafka has a "dirty ratio" concept > for its log-compacted topics. Once the dirty (unclean) portion of the log > passes the "dirty ratio" (e.g. 50% of the log hasn't been clean, based on > bytes on disk), the log cleaner kicks in. Once this happens, you can't > restore to prior intermediate states. > > There has been some verbal discussion about changing the concept of a > "dirty ratio" to be time-based. Something like, "always keep the last N > hours dirty". This would allow you to restore to any state within the last > N hours (but not prior, as you point out). The nice thing about this > approach is that it gives a very clear guarantee about how far back you can > go. > > If you wanted to restore to a point in time prior to the last cleaning, > then you'd have to keep some sort of snapshot, as you suggest. There might > be other tricks you could play, but nothing is coming to mind. > > Where this gets a little funky for Samza is in a case where a state store's > changelog is ahead of the last checkpointed input offsets. In an ideal > world, you'd like to restore up to the offset in the changelog that matched > the point where the input offsets were committed. For example, if you have > a message order like so: > > cl1, cl2, cl3, cp1, cl4, cl5, <fail> > > When you start up, you'd like to read the changelog up to cl3, and start > the consumers at offset cp1. If your processing is idempotent and > deterministic, then cl4 and cl5 will get over-written, and you'll have > consistent state moving forward. In practice, we don't do this. If a log > cleaner were to kick in, though, and delete cl3 (say it has the same key as > cl4) before you restart, then you *can't* restore to this point. > > In practice we haven't really worried about this because, as it is, we > simply restore up to cl5 on restart. There is a lot of complexity with not > restoring up to cl5: > > 1. What if someone is consuming from the changelog topic? You can't simply > truncate the changelog on restart. > 2. What if the log cleaner has deleted cl3, or prior CL messages (as you > suggested)? > 3. What if your processing is non-deterministic or non-idempotent? > > The most ideal scenario is simply to have Kafka support transactions. This > will allow us to atomically commit cp1 and all output (both changelog, and > regular). This would at least guarantee state-consistent restore for the > *current* checkpoint. Going backwards in time would have to be addressed by > the time-based dirty policy that I described above. > > In the absence of this, we *could* make the checkpoints contain changelog > offsets, and only restore up to that point on restart (cp3 in the example). > We haven't done this, but I think it's do-able. This will at least make > things restore properly for the *current* checkpoint, when the processing > is *deterministic* and *idempotent*. You can open a JIRA for this if you > need it. > > Cheers, > Chris > > On Thu, Feb 19, 2015 at 6:55 PM, Roger Hoover <roger.hoo...@gmail.com> > wrote: > > > Chris + Samza Devs, > > > > I was wondering whether Samza could support re-processing as described by > > the Kappa architecture or Liquid ( > > http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf). > > > > It seems that a changelog is not sufficient to be able to restore state > > backward in time. Kafka compaction will guarantee that local state can > be > > restored from where it left off but I don't see how it can restore past > > state. > > > > Imagine the case where a stream job has a lot of state in it's local > store > > but it has not updated any keys in a long time. > > > > Time t1: All of the data would be in the tail of the Kafka log (past the > > cleaner point). > > Time t2: The job updates some keys. Now we're in a state where the > next > > compaction will blow away the old values for those keys. > > Time t3: Compaction occurs and old values are discarded. > > > > Say we want to launch a re-processing job that would begin from t1. If > we > > launch that job before t3, it will correctly restore it's state. > However, > > if we launch the job after t3, it will be missing old values, right? > > > > Unless I'm misunderstanding something, the only way around this is to > keep > > snapshots in addition to the changelog. Has there been any discussion of > > providing an option in Samza of taking RocksDB snapshots and persisting > > them to an object store or HDFS? > > > > Thanks, > > > > Roger > > >