Michael, > Guozhang, in (2) above did you mean "some keys *may be* hashed to different > partitions and the existing local state stores will not be valid?" > That fits with out understanding.
Yes, that's what Guozhang meant. Corrected version: When you increase the number of input partitions and hence number of processors / local stores, however, some keys may be hashed to different partitions and the existing local state stores will not be valid in this case. [...] Hope this helps, Michael On Wed, Jul 20, 2016 at 11:13 PM, Michael Ruberry <michae...@taboola.com> wrote: > Thank you both for your replies. This is incredibly helpful. > > Guozhang, in (2) above did you mean "some keys* may be* hashed to different > partitions and the existing local state stores will not be valid?" That > fits with out understanding. > > As to your caveats in (3) and (4), we are trying to be sure that our > datastore will be "loaded" properly before we begin processing. Would you > say the promise when we request a store value for a key given in > process(key, ...) is that we get the most up date value for that key? Is > this promise true if we restart the app or create a new app consuming the > same local store? I believe that's the case but want to double check now. > > On Wed, Jul 20, 2016 at 1:14 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Michael, > > > > 1. Kafka Streams always tries to colocate the local stores with the > > processing nodes based on the partition key. For example, if you want to > do > > an aggregation based on key K1, but the input topic is not keyed on K1 > and > > hence not partitioned on that. The library then will auto-repartition > into > > an intermediate topic based on K1 to make sure that the local stores used > > for storing the aggregates based on K1 will be co-located with the > > processor that gets partitions hashed on K1 as well. > > > > 2. When you increase the number of input partitions and hence number of > > processors / local stores, however, some keys may not be hashed to > > different partitions and the existing local state stores will not be > valid > > in this case. In practice, we recommend users to over-partition their > input > > topics (note that multiple partitions can be assigned to the same > > processor) so that when they increase the number of streams instances > > later, they do not need to add more partitions. > > > > 3. If you change your code, again the existing state stores may not be > > valid (note colocating is still guaranteed) anymore depending on how you > > modified the computational logic. In this case either you treat the new > > code as a new application with a different application id so that > > everything can be restarted from scratch, or you can "wipe out" the > > existing invalid processing state, for which we have provided some tools > > for this purpose and are writing a tutorial about how to do > "re-processing" > > in general. > > > > 4. About bootstrapping, currently Kafka Streams does not have a > "bootstrap > > stream" concept so that it can be processed completely before processing > > other streams. Instead, we are currently relying on using the record > > timestamp to "synchronize streams" (similar to the message chooser > > functionality in Samza) and you can find more details here: > > > > > > > http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps > > > > And we are currently working on having finer flow control mechanisms as > > well: > > > > https://issues.apache.org/jira/browse/KAFKA-3478 > > > > > > Hope these help. > > > > Guozhang > > > > > > > > On Wed, Jul 20, 2016 at 12:36 PM, Eno Thereska <eno.there...@gmail.com> > > wrote: > > > > > Hi Michael, > > > > > > These are good questions and I can confirm that the system works in the > > > way you hope it works, if you use the DSL and don't make up keys > > > arbitrarily. In other words, there is nothing currently that prevents > you > > > from shooting yourself in the foot e.g., by making up keys and using > the > > > same made-up key in different processing nodes. > > > > > > However, if you use the Kafka Streams primitives, then such bad > > situations > > > are not supposed to happen. > > > > > > Thanks > > > Eno > > > > > > > On 18 Jul 2016, at 11:28, Michael Ruberry <michae...@taboola.com> > > wrote: > > > > > > > > Hi all, > > > > > > > > My company, Taboola, has been looking at Kafka Streams and we are > still > > > > confused about some details of partitions and store persistence. > > > > > > > > So let's say we have an app consuming a single topic and we're using > an > > > > embedded and persisted key-value store: > > > > > > > > > > > > 1. If we restart the app, will each node finish loading values from > > the > > > > store before it begins processing? (Analogous to Samza's boot-strap > > > > streams?) > > > > 2. How are are the store's entries partitioned among the nodes? Is > > > there > > > > a promise that if an entry in the store and the stream have the > same > > > key > > > > that they will be colocated on the same node? > > > > 3. Another way of putting the above question: assuming we are using > > the > > > > Processor API, if in process (key, value) we only update the store > on > > > the > > > > current key, are we guaranteed that across restarts the prior data > > > written > > > > for each key will be available to the node processing that key? > > > > 4. Does the answer(s) to 2 and 3 change if the number of nodes > varies > > > > across restarts? > > > > 5. Does the answer(s) to 2,3 and 4 change if there are code changes > > > > across restarts? > > > > 6. If we write an arbitrary key in process(), what will happen? > What > > if > > > > two nodes write the same key in the same store? In general, should > we > > > only > > > > be accessing keys that correspond to the key given in process()? > > > > > > > > > > > > We hope the system works like this: > > > > > > > > > > > > 1. Data from the data store does act like a boot-strap stream in > > Samza, > > > > and on restart is distributed to nodes according to the data's keys > > > BEFORE > > > > any other processing is performed > > > > 2. If two items, whether from a topic or a data store, have the > same > > > > key, then they are processed on the same node > > > > > > > > > > > > These are the critical features we're looking for, as we think they > > will > > > > allow us to (1) adjust the number of nodes and make code changes > > without > > > > worrying about data loss, (2) guarantee that data and processing are > > > > properly colocated, and (3) easily coordinate multiple topics and > data > > > > stores. > > > > > > > > Looking forward to your responses! > > > > > > > > > > > > -- > > -- Guozhang > > > -- *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download Apache Kafka and Confluent Platform: www.confluent.io/download <http://www.confluent.io/download>*