Thanks for the input Jay.

From my understanding, your question boils down to how fuzzy the stop
point can/should be, and what guarantees we want to provide to the user.
This has multiple dimension:


1. Using a timestamp has the main issue, that if an input topic has no
data with this timestamp, the application does never finish (ie, stop
timestamp is in "the future").

Furthermore, it would require the user to specify the timestamp because
if we use current (ie, on startup) system time as stop-timestamp, the
case of a "future stop-timestamp" might be very common (think no active
producers for a topic -- it's batch processing). Thus, the user needs to
know the stop-timestamp, which might be hard for her to figure out -- in
the current design it's way simpler for the user to activate "auto stop".

Last but not least, assume an application with two subtopologies that
are connected via an intermediate topic and both subtopologies are
executed in different JVMs. The first topology could filter a lot of
messages and thus it might happen, that it never writes a record with
timestamp >= stop-timestamp into the intermediate topic. Thus, even if
the first JVM terminates the second would not stop automatically as it
never reads a record with timestamp >= stop-timestamp.

There would be some workaround if we shut down in a "fuzzy way", ie,
with no guarantees what record will actually get processed (ie, stop
processing somewhat early of some cases). But I will argue in (3) why
this "stop roughly about this time semantic" is not a good idea.


2. I was not aware of a metadata field for committed offsets and this
sounds quite appealing. However, thinking about it in more detail, I
have my doubts we can use it:

If we want to propagate stop-offsets for intermediate topics, all
producer instances would need to update this metadata field, thus need
to commit (A producer that does commit? Well we could use "restore
consumer" with manual partition assignment for this.) -- however, this
would not only conflict with the commits of the actual consumer, but
also in between all running producers.


3. This is the overall "how fuzzy we want to be" discussion. I would
argue that we should provide somewhat strong stop consistency. Assume
the following use case. An external application generates data in
batches and writes files to HDFS. Those files are imported into Kafka
via Connect. Each time a batch of data gots inserted into Kafka, this
data should be processed with a Streams application. If we cannot
guarantee that all data of this batch is fully processed and the result
is complete, use experience would be quite bad.

Furthermore, we want to guard a running batch job to process "too much"
data: Assume the same scenario as before with HDFS + Connect. For
whatever reason, a Streams batch job takes longer than usual, and while
it is running new data is appended to the topic as new files (of the
next batch) are already available. We don't want to process this data
with the current running app, but want it to be included in the next
batch run (you could imagine, that each batch job will write the result
into a different output topic). Thus, we want to have all data processed
completely, ie, provide strong start-stop consistency.


Sorry for the quite long answer. Hope it is convincing though :)


-Matthias


On 12/5/16 4:44 PM, Jay Kreps wrote:
> I'd like to second the discouragement of adding a new topic per job. We
> went down this path in Samza and I think the result was quite a mess. You
> had to read the full topic every time a job started and so it added a lot
> of overhead and polluted the topic space.
> 
> What if we did the following:
> 
>    1. Use timestamp instead of offset
>    2. Store the "stopping timestamp" in the metadata field associated with
>    the existing offset storage mechanism
>    3. Don't worry about fully processing the entire DAG. After all,
>    partially processing a tuple isn't much different from not processing it,
>    and in any case the stopping point is a heuristic so no point in being
>    overly precise here.
> 
> Probably I'm missing something, though, I haven't thought through the
> implications of using time instead of offset.
> 
> -Jay
> 
> On Mon, Nov 28, 2016 at 10:47 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Hi all,
>>
>> I want to start a discussion about KIP-95:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
>>
>> Looking forward to your feedback.
>>
>>
>> -Matthias
>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to