Chris,

Thank you for this really great response.  I don't need this right now but
wanted to understand the limitations.  If Kafka could guarantee to keep the
last N hours dirty, that would provide foundation to build on.

Thanks,

Roger

On Fri, Feb 20, 2015 at 8:16 AM, Chris Riccomini <criccom...@apache.org>
wrote:

> Hey Roger,
>
> I believe your description is correct. Kafka has a "dirty ratio" concept
> for its log-compacted topics. Once the dirty (unclean) portion of the log
> passes the "dirty ratio" (e.g. 50% of the log hasn't been clean, based on
> bytes on disk), the log cleaner kicks in. Once this happens, you can't
> restore to prior intermediate states.
>
> There has been some verbal discussion about changing the concept of a
> "dirty ratio" to be time-based. Something like, "always keep the last N
> hours dirty". This would allow you to restore to any state within the last
> N hours (but not prior, as you point out). The nice thing about this
> approach is that it gives a very clear guarantee about how far back you can
> go.
>
> If you wanted to restore to a point in time prior to the last cleaning,
> then you'd have to keep some sort of snapshot, as you suggest. There might
> be other tricks you could play, but nothing is coming to mind.
>
> Where this gets a little funky for Samza is in a case where a state store's
> changelog is ahead of the last checkpointed input offsets. In an ideal
> world, you'd like to restore up to the offset in the changelog that matched
> the point where the input offsets were committed. For example, if you have
> a message order like so:
>
>   cl1, cl2, cl3, cp1, cl4, cl5, <fail>
>
> When you start up, you'd like to read the changelog up to cl3, and start
> the consumers at offset cp1. If your processing is idempotent and
> deterministic, then cl4 and cl5 will get over-written, and you'll have
> consistent state moving forward. In practice, we don't do this. If a log
> cleaner were to kick in, though, and delete cl3 (say it has the same key as
> cl4) before you restart, then you *can't* restore to this point.
>
> In practice we haven't really worried about this because, as it is, we
> simply restore up to cl5 on restart. There is a lot of complexity with not
> restoring up to cl5:
>
> 1. What if someone is consuming from the changelog topic? You can't simply
> truncate the changelog on restart.
> 2. What if the log cleaner has deleted cl3, or prior CL messages (as you
> suggested)?
> 3. What if your processing is non-deterministic or non-idempotent?
>
> The most ideal scenario is simply to have Kafka support transactions. This
> will allow us to atomically commit cp1 and all output (both changelog, and
> regular). This would at least guarantee state-consistent restore for the
> *current* checkpoint. Going backwards in time would have to be addressed by
> the time-based dirty policy that I described above.
>
> In the absence of this, we *could* make the checkpoints contain changelog
> offsets, and only restore up to that point on restart (cp3 in the example).
> We haven't done this, but I think it's do-able. This will at least make
> things restore properly for the *current* checkpoint, when the processing
> is *deterministic* and *idempotent*. You can open a JIRA for this if you
> need it.
>
> Cheers,
> Chris
>
> On Thu, Feb 19, 2015 at 6:55 PM, Roger Hoover <roger.hoo...@gmail.com>
> wrote:
>
> > Chris + Samza Devs,
> >
> > I was wondering whether Samza could support re-processing as described by
> > the Kappa architecture or Liquid (
> > http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf).
> >
> > It seems that a changelog is not sufficient to be able to restore state
> > backward in time.  Kafka compaction will guarantee that local state can
> be
> > restored from where it left off but I don't see how it can restore past
> > state.
> >
> > Imagine the case where a stream job has a lot of state in it's local
> store
> > but it has not updated any keys in a long time.
> >
> > Time t1: All of the data would be in the tail of the Kafka log (past the
> > cleaner point).
> > Time t2:  The job updates some keys.   Now we're in a state where the
> next
> > compaction will blow away the old values for those keys.
> > Time t3:  Compaction occurs and old values are discarded.
> >
> > Say we want to launch a re-processing job that would begin from t1.  If
> we
> > launch that job before t3, it will correctly restore it's state.
> However,
> > if we launch the job after t3, it will be missing old values, right?
> >
> > Unless I'm misunderstanding something, the only way around this is to
> keep
> > snapshots in addition to the changelog.  Has there been any discussion of
> > providing an option in Samza of taking RocksDB snapshots and persisting
> > them to an object store or HDFS?
> >
> > Thanks,
> >
> > Roger
> >
>

Reply via email to