Michael,

> Guozhang, in (2) above did you mean "some keys *may be* hashed to
different
> partitions and the existing local state stores will not be valid?"
> That fits with out understanding.

Yes, that's what Guozhang meant.

Corrected version:

    When you increase the number of input partitions and hence number of
    processors / local stores, however, some keys may be hashed to
    different partitions and the existing local state stores will not be
valid
    in this case. [...]


Hope this helps,
Michael



On Wed, Jul 20, 2016 at 11:13 PM, Michael Ruberry <michae...@taboola.com>
wrote:

> Thank you both for your replies. This is incredibly helpful.
>
> Guozhang, in (2) above did you mean "some keys* may be* hashed to different
> partitions and the existing local state stores will not be valid?" That
> fits with out understanding.
>
> As to your caveats in (3) and (4), we are trying to be sure that our
> datastore will be "loaded" properly before we begin processing. Would you
> say the promise when we request a store value for a key given in
> process(key, ...) is that we get the most up date value for that key? Is
> this promise true if we restart the app or create a new app consuming the
> same local store? I believe that's the case but want to double check now.
>
> On Wed, Jul 20, 2016 at 1:14 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Michael,
> >
> > 1. Kafka Streams always tries to colocate the local stores with the
> > processing nodes based on the partition key. For example, if you want to
> do
> > an aggregation based on key K1, but the input topic is not keyed on K1
> and
> > hence not partitioned on that. The library then will auto-repartition
> into
> > an intermediate topic based on K1 to make sure that the local stores used
> > for storing the aggregates based on K1 will be co-located with the
> > processor that gets partitions hashed on K1 as well.
> >
> > 2. When you increase the number of input partitions and hence number of
> > processors / local stores, however, some keys may not be hashed to
> > different partitions and the existing local state stores will not be
> valid
> > in this case. In practice, we recommend users to over-partition their
> input
> > topics (note that multiple partitions can be assigned to the same
> > processor) so that when they increase the number of streams instances
> > later, they do not need to add more partitions.
> >
> > 3. If you change your code, again the existing state stores may not be
> > valid (note colocating is still guaranteed) anymore depending on how you
> > modified the computational logic. In this case either you treat the new
> > code as a new application with a different application id so that
> > everything can be restarted from scratch, or you can "wipe out" the
> > existing invalid processing state, for which we have provided some tools
> > for this purpose and are writing a tutorial about how to do
> "re-processing"
> > in general.
> >
> > 4. About bootstrapping, currently Kafka Streams does not have a
> "bootstrap
> > stream" concept so that it can be processed completely before processing
> > other streams. Instead, we are currently relying on using the record
> > timestamp to "synchronize streams" (similar to the message chooser
> > functionality in Samza) and you can find more details here:
> >
> >
> >
> http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps
> >
> > And we are currently working on having finer flow control mechanisms as
> > well:
> >
> > https://issues.apache.org/jira/browse/KAFKA-3478
> >
> >
> > Hope these help.
> >
> > Guozhang
> >
> >
> >
> > On Wed, Jul 20, 2016 at 12:36 PM, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> > > Hi Michael,
> > >
> > > These are good questions and I can confirm that the system works in the
> > > way you hope it works, if you use the DSL and don't make up keys
> > > arbitrarily. In other words, there is nothing currently that prevents
> you
> > > from shooting yourself in the foot e.g., by making up keys and using
> the
> > > same made-up key in different processing nodes.
> > >
> > > However, if you use the Kafka Streams primitives, then such bad
> > situations
> > > are not supposed to happen.
> > >
> > > Thanks
> > > Eno
> > >
> > > > On 18 Jul 2016, at 11:28, Michael Ruberry <michae...@taboola.com>
> > wrote:
> > > >
> > > > Hi all,
> > > >
> > > > My company, Taboola, has been looking at Kafka Streams and we are
> still
> > > > confused about some details of partitions and store persistence.
> > > >
> > > > So let's say we have an app consuming a single topic and we're using
> an
> > > > embedded and persisted key-value store:
> > > >
> > > >
> > > >   1. If we restart the app, will each node finish loading values from
> > the
> > > >   store before it begins processing? (Analogous to Samza's boot-strap
> > > >   streams?)
> > > >   2. How are are the store's entries partitioned among the nodes? Is
> > > there
> > > >   a promise that if an entry in the store and the stream have the
> same
> > > key
> > > >   that they will be colocated on the same node?
> > > >   3. Another way of putting the above question: assuming we are using
> > the
> > > >   Processor API, if in process (key, value) we only update the store
> on
> > > the
> > > >   current key, are we guaranteed that across restarts the prior data
> > > written
> > > >   for each key will be available to the node processing that key?
> > > >   4. Does the answer(s) to 2 and 3 change if the number of nodes
> varies
> > > >   across restarts?
> > > >   5. Does the answer(s) to 2,3 and 4 change if there are code changes
> > > >   across restarts?
> > > >   6. If we write an arbitrary key in process(), what will happen?
> What
> > if
> > > >   two nodes write the same key in the same store? In general, should
> we
> > > only
> > > >   be accessing keys that correspond to the key given in process()?
> > > >
> > > >
> > > > We hope the system works like this:
> > > >
> > > >
> > > >   1. Data from the data store does act like a boot-strap stream in
> > Samza,
> > > >   and on restart is distributed to nodes according to the data's keys
> > > BEFORE
> > > >   any other processing is performed
> > > >   2. If two items, whether from a topic or a data store, have the
> same
> > > >   key, then they are processed on the same node
> > > >
> > > >
> > > > These are the critical features we're looking for, as we think they
> > will
> > > > allow us to (1) adjust the number of nodes and make code changes
> > without
> > > > worrying about data loss, (2) guarantee that data and processing are
> > > > properly colocated, and (3) easily coordinate multiple topics and
> data
> > > > stores.
> > > >
> > > > Looking forward to your responses!
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 

*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
<http://www.confluent.io/download>*

Reply via email to