I will read through the KIP doc once again to provide more detailed feedbacks, but let me through my two cents just for the above email.
There are a few motivations to have a "consistent" stop-point across tasks belonging to different sub-topologies. One of them is for interactive queries: say you have two state stores belonging to two sub-topologies, if they stopped at different points, then when user querying them they will see inconsistent answers (think about the example people always use in databases: the stores represent A and B's bank account and a record is processed to move X dollar from A to B). As for the implementation to support such consistent stop-points though, I think the metadata field in offset topic does worth exploring, because Streams may very likely use the transactional APIs proposed in KIP-98 to let producers send offsets in a transactional manner, not the consumers themselves (details can be found here https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8 ). Guozhang On Tue, Dec 6, 2016 at 2:45 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for the input Jay. > > From my understanding, your question boils down to how fuzzy the stop > point can/should be, and what guarantees we want to provide to the user. > This has multiple dimension: > > > 1. Using a timestamp has the main issue, that if an input topic has no > data with this timestamp, the application does never finish (ie, stop > timestamp is in "the future"). > > Furthermore, it would require the user to specify the timestamp because > if we use current (ie, on startup) system time as stop-timestamp, the > case of a "future stop-timestamp" might be very common (think no active > producers for a topic -- it's batch processing). Thus, the user needs to > know the stop-timestamp, which might be hard for her to figure out -- in > the current design it's way simpler for the user to activate "auto stop". > > Last but not least, assume an application with two subtopologies that > are connected via an intermediate topic and both subtopologies are > executed in different JVMs. The first topology could filter a lot of > messages and thus it might happen, that it never writes a record with > timestamp >= stop-timestamp into the intermediate topic. Thus, even if > the first JVM terminates the second would not stop automatically as it > never reads a record with timestamp >= stop-timestamp. > > There would be some workaround if we shut down in a "fuzzy way", ie, > with no guarantees what record will actually get processed (ie, stop > processing somewhat early of some cases). But I will argue in (3) why > this "stop roughly about this time semantic" is not a good idea. > > > 2. I was not aware of a metadata field for committed offsets and this > sounds quite appealing. However, thinking about it in more detail, I > have my doubts we can use it: > > If we want to propagate stop-offsets for intermediate topics, all > producer instances would need to update this metadata field, thus need > to commit (A producer that does commit? Well we could use "restore > consumer" with manual partition assignment for this.) -- however, this > would not only conflict with the commits of the actual consumer, but > also in between all running producers. > > > 3. This is the overall "how fuzzy we want to be" discussion. I would > argue that we should provide somewhat strong stop consistency. Assume > the following use case. An external application generates data in > batches and writes files to HDFS. Those files are imported into Kafka > via Connect. Each time a batch of data gots inserted into Kafka, this > data should be processed with a Streams application. If we cannot > guarantee that all data of this batch is fully processed and the result > is complete, use experience would be quite bad. > > Furthermore, we want to guard a running batch job to process "too much" > data: Assume the same scenario as before with HDFS + Connect. For > whatever reason, a Streams batch job takes longer than usual, and while > it is running new data is appended to the topic as new files (of the > next batch) are already available. We don't want to process this data > with the current running app, but want it to be included in the next > batch run (you could imagine, that each batch job will write the result > into a different output topic). Thus, we want to have all data processed > completely, ie, provide strong start-stop consistency. > > > Sorry for the quite long answer. Hope it is convincing though :) > > > -Matthias > > > On 12/5/16 4:44 PM, Jay Kreps wrote: > > I'd like to second the discouragement of adding a new topic per job. We > > went down this path in Samza and I think the result was quite a mess. You > > had to read the full topic every time a job started and so it added a lot > > of overhead and polluted the topic space. > > > > What if we did the following: > > > > 1. Use timestamp instead of offset > > 2. Store the "stopping timestamp" in the metadata field associated > with > > the existing offset storage mechanism > > 3. Don't worry about fully processing the entire DAG. After all, > > partially processing a tuple isn't much different from not processing > it, > > and in any case the stopping point is a heuristic so no point in being > > overly precise here. > > > > Probably I'm missing something, though, I haven't thought through the > > implications of using time instead of offset. > > > > -Jay > > > > On Mon, Nov 28, 2016 at 10:47 AM, Matthias J. Sax <matth...@confluent.io > > > > wrote: > > > >> Hi all, > >> > >> I want to start a discussion about KIP-95: > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams > >> > >> Looking forward to your feedback. > >> > >> > >> -Matthias > >> > >> > >> > > > > -- -- Guozhang