Thanks for the input Jay. From my understanding, your question boils down to how fuzzy the stop point can/should be, and what guarantees we want to provide to the user. This has multiple dimension:
1. Using a timestamp has the main issue, that if an input topic has no data with this timestamp, the application does never finish (ie, stop timestamp is in "the future"). Furthermore, it would require the user to specify the timestamp because if we use current (ie, on startup) system time as stop-timestamp, the case of a "future stop-timestamp" might be very common (think no active producers for a topic -- it's batch processing). Thus, the user needs to know the stop-timestamp, which might be hard for her to figure out -- in the current design it's way simpler for the user to activate "auto stop". Last but not least, assume an application with two subtopologies that are connected via an intermediate topic and both subtopologies are executed in different JVMs. The first topology could filter a lot of messages and thus it might happen, that it never writes a record with timestamp >= stop-timestamp into the intermediate topic. Thus, even if the first JVM terminates the second would not stop automatically as it never reads a record with timestamp >= stop-timestamp. There would be some workaround if we shut down in a "fuzzy way", ie, with no guarantees what record will actually get processed (ie, stop processing somewhat early of some cases). But I will argue in (3) why this "stop roughly about this time semantic" is not a good idea. 2. I was not aware of a metadata field for committed offsets and this sounds quite appealing. However, thinking about it in more detail, I have my doubts we can use it: If we want to propagate stop-offsets for intermediate topics, all producer instances would need to update this metadata field, thus need to commit (A producer that does commit? Well we could use "restore consumer" with manual partition assignment for this.) -- however, this would not only conflict with the commits of the actual consumer, but also in between all running producers. 3. This is the overall "how fuzzy we want to be" discussion. I would argue that we should provide somewhat strong stop consistency. Assume the following use case. An external application generates data in batches and writes files to HDFS. Those files are imported into Kafka via Connect. Each time a batch of data gots inserted into Kafka, this data should be processed with a Streams application. If we cannot guarantee that all data of this batch is fully processed and the result is complete, use experience would be quite bad. Furthermore, we want to guard a running batch job to process "too much" data: Assume the same scenario as before with HDFS + Connect. For whatever reason, a Streams batch job takes longer than usual, and while it is running new data is appended to the topic as new files (of the next batch) are already available. We don't want to process this data with the current running app, but want it to be included in the next batch run (you could imagine, that each batch job will write the result into a different output topic). Thus, we want to have all data processed completely, ie, provide strong start-stop consistency. Sorry for the quite long answer. Hope it is convincing though :) -Matthias On 12/5/16 4:44 PM, Jay Kreps wrote: > I'd like to second the discouragement of adding a new topic per job. We > went down this path in Samza and I think the result was quite a mess. You > had to read the full topic every time a job started and so it added a lot > of overhead and polluted the topic space. > > What if we did the following: > > 1. Use timestamp instead of offset > 2. Store the "stopping timestamp" in the metadata field associated with > the existing offset storage mechanism > 3. Don't worry about fully processing the entire DAG. After all, > partially processing a tuple isn't much different from not processing it, > and in any case the stopping point is a heuristic so no point in being > overly precise here. > > Probably I'm missing something, though, I haven't thought through the > implications of using time instead of offset. > > -Jay > > On Mon, Nov 28, 2016 at 10:47 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Hi all, >> >> I want to start a discussion about KIP-95: >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams >> >> Looking forward to your feedback. >> >> >> -Matthias >> >> >> >
signature.asc
Description: OpenPGP digital signature