Hi Michael, 1. Kafka Streams always tries to colocate the local stores with the processing nodes based on the partition key. For example, if you want to do an aggregation based on key K1, but the input topic is not keyed on K1 and hence not partitioned on that. The library then will auto-repartition into an intermediate topic based on K1 to make sure that the local stores used for storing the aggregates based on K1 will be co-located with the processor that gets partitions hashed on K1 as well.
2. When you increase the number of input partitions and hence number of processors / local stores, however, some keys may not be hashed to different partitions and the existing local state stores will not be valid in this case. In practice, we recommend users to over-partition their input topics (note that multiple partitions can be assigned to the same processor) so that when they increase the number of streams instances later, they do not need to add more partitions. 3. If you change your code, again the existing state stores may not be valid (note colocating is still guaranteed) anymore depending on how you modified the computational logic. In this case either you treat the new code as a new application with a different application id so that everything can be restarted from scratch, or you can "wipe out" the existing invalid processing state, for which we have provided some tools for this purpose and are writing a tutorial about how to do "re-processing" in general. 4. About bootstrapping, currently Kafka Streams does not have a "bootstrap stream" concept so that it can be processed completely before processing other streams. Instead, we are currently relying on using the record timestamp to "synchronize streams" (similar to the message chooser functionality in Samza) and you can find more details here: http://docs.confluent.io/3.0.0/streams/architecture.html#flow-control-with-timestamps And we are currently working on having finer flow control mechanisms as well: https://issues.apache.org/jira/browse/KAFKA-3478 Hope these help. Guozhang On Wed, Jul 20, 2016 at 12:36 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Michael, > > These are good questions and I can confirm that the system works in the > way you hope it works, if you use the DSL and don't make up keys > arbitrarily. In other words, there is nothing currently that prevents you > from shooting yourself in the foot e.g., by making up keys and using the > same made-up key in different processing nodes. > > However, if you use the Kafka Streams primitives, then such bad situations > are not supposed to happen. > > Thanks > Eno > > > On 18 Jul 2016, at 11:28, Michael Ruberry <michae...@taboola.com> wrote: > > > > Hi all, > > > > My company, Taboola, has been looking at Kafka Streams and we are still > > confused about some details of partitions and store persistence. > > > > So let's say we have an app consuming a single topic and we're using an > > embedded and persisted key-value store: > > > > > > 1. If we restart the app, will each node finish loading values from the > > store before it begins processing? (Analogous to Samza's boot-strap > > streams?) > > 2. How are are the store's entries partitioned among the nodes? Is > there > > a promise that if an entry in the store and the stream have the same > key > > that they will be colocated on the same node? > > 3. Another way of putting the above question: assuming we are using the > > Processor API, if in process (key, value) we only update the store on > the > > current key, are we guaranteed that across restarts the prior data > written > > for each key will be available to the node processing that key? > > 4. Does the answer(s) to 2 and 3 change if the number of nodes varies > > across restarts? > > 5. Does the answer(s) to 2,3 and 4 change if there are code changes > > across restarts? > > 6. If we write an arbitrary key in process(), what will happen? What if > > two nodes write the same key in the same store? In general, should we > only > > be accessing keys that correspond to the key given in process()? > > > > > > We hope the system works like this: > > > > > > 1. Data from the data store does act like a boot-strap stream in Samza, > > and on restart is distributed to nodes according to the data's keys > BEFORE > > any other processing is performed > > 2. If two items, whether from a topic or a data store, have the same > > key, then they are processed on the same node > > > > > > These are the critical features we're looking for, as we think they will > > allow us to (1) adjust the number of nodes and make code changes without > > worrying about data loss, (2) guarantee that data and processing are > > properly colocated, and (3) easily coordinate multiple topics and data > > stores. > > > > Looking forward to your responses! > > -- -- Guozhang