Thank you both for your replies. This is incredibly helpful.

Guozhang, in (2) above did you mean "some keys* may be* hashed to different
partitions and the existing local state stores will not be valid?" That
fits with out understanding.

As to your caveats in (3) and (4), we are trying to be sure that our
datastore will be "loaded" properly before we begin processing. Would you
say the promise when we request a store value for a key given in
process(key, ...) is that we get the most up date value for that key? Is
this promise true if we restart the app or create a new app consuming the
same local store? I believe that's the case but want to double check now.

On Wed, Jul 20, 2016 at 1:14 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Michael,
>
> 1. Kafka Streams always tries to colocate the local stores with the
> processing nodes based on the partition key. For example, if you want to do
> an aggregation based on key K1, but the input topic is not keyed on K1 and
> hence not partitioned on that. The library then will auto-repartition into
> an intermediate topic based on K1 to make sure that the local stores used
> for storing the aggregates based on K1 will be co-located with the
> processor that gets partitions hashed on K1 as well.
>
> 2. When you increase the number of input partitions and hence number of
> processors / local stores, however, some keys may not be hashed to
> different partitions and the existing local state stores will not be valid
> in this case. In practice, we recommend users to over-partition their input
> topics (note that multiple partitions can be assigned to the same
> processor) so that when they increase the number of streams instances
> later, they do not need to add more partitions.
>
> 3. If you change your code, again the existing state stores may not be
> valid (note colocating is still guaranteed) anymore depending on how you
> modified the computational logic. In this case either you treat the new
> code as a new application with a different application id so that
> everything can be restarted from scratch, or you can "wipe out" the
> existing invalid processing state, for which we have provided some tools
> for this purpose and are writing a tutorial about how to do "re-processing"
> in general.
>
> 4. About bootstrapping, currently Kafka Streams does not have a "bootstrap
> stream" concept so that it can be processed completely before processing
> other streams. Instead, we are currently relying on using the record
> timestamp to "synchronize streams" (similar to the message chooser
> functionality in Samza) and you can find more details here:
>
>
> http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps
>
> And we are currently working on having finer flow control mechanisms as
> well:
>
> https://issues.apache.org/jira/browse/KAFKA-3478
>
>
> Hope these help.
>
> Guozhang
>
>
>
> On Wed, Jul 20, 2016 at 12:36 PM, Eno Thereska <eno.there...@gmail.com>
> wrote:
>
> > Hi Michael,
> >
> > These are good questions and I can confirm that the system works in the
> > way you hope it works, if you use the DSL and don't make up keys
> > arbitrarily. In other words, there is nothing currently that prevents you
> > from shooting yourself in the foot e.g., by making up keys and using the
> > same made-up key in different processing nodes.
> >
> > However, if you use the Kafka Streams primitives, then such bad
> situations
> > are not supposed to happen.
> >
> > Thanks
> > Eno
> >
> > > On 18 Jul 2016, at 11:28, Michael Ruberry <michae...@taboola.com>
> wrote:
> > >
> > > Hi all,
> > >
> > > My company, Taboola, has been looking at Kafka Streams and we are still
> > > confused about some details of partitions and store persistence.
> > >
> > > So let's say we have an app consuming a single topic and we're using an
> > > embedded and persisted key-value store:
> > >
> > >
> > >   1. If we restart the app, will each node finish loading values from
> the
> > >   store before it begins processing? (Analogous to Samza's boot-strap
> > >   streams?)
> > >   2. How are are the store's entries partitioned among the nodes? Is
> > there
> > >   a promise that if an entry in the store and the stream have the same
> > key
> > >   that they will be colocated on the same node?
> > >   3. Another way of putting the above question: assuming we are using
> the
> > >   Processor API, if in process (key, value) we only update the store on
> > the
> > >   current key, are we guaranteed that across restarts the prior data
> > written
> > >   for each key will be available to the node processing that key?
> > >   4. Does the answer(s) to 2 and 3 change if the number of nodes varies
> > >   across restarts?
> > >   5. Does the answer(s) to 2,3 and 4 change if there are code changes
> > >   across restarts?
> > >   6. If we write an arbitrary key in process(), what will happen? What
> if
> > >   two nodes write the same key in the same store? In general, should we
> > only
> > >   be accessing keys that correspond to the key given in process()?
> > >
> > >
> > > We hope the system works like this:
> > >
> > >
> > >   1. Data from the data store does act like a boot-strap stream in
> Samza,
> > >   and on restart is distributed to nodes according to the data's keys
> > BEFORE
> > >   any other processing is performed
> > >   2. If two items, whether from a topic or a data store, have the same
> > >   key, then they are processed on the same node
> > >
> > >
> > > These are the critical features we're looking for, as we think they
> will
> > > allow us to (1) adjust the number of nodes and make code changes
> without
> > > worrying about data loss, (2) guarantee that data and processing are
> > > properly colocated, and (3) easily coordinate multiple topics and data
> > > stores.
> > >
> > > Looking forward to your responses!
> >
> >
>
>
> --
> -- Guozhang
>

Reply via email to