With the marker we have substituted a failure problem for a liveness problem in that under repeated failure the other instances will not do any useful work.
As mentioned in the other email, I don't know if we need to worry about that corner case just yet. Eno > On 30 Nov 2016, at 20:06, Matthias J. Sax <matth...@confluent.io> wrote: > > Eno, > >> So in general, we have the problem of failures during writes to the > metadata topic itself. > > The KIP suggests to use marker messaged for this case. The marker is > either written (indicating success) or not. If not, after > failure/rebalance the (new) group leader will collect HW again. As long > as the marker is not written, all other instances will not start > processing -- they block until they read the marker which ensures that > the HW is fixed. > >> feel like we are trying to provide some sort of synchronization > between app instances > > Not really. Only at application startup, instances reading source topics > need to wait until the leader did write the stop offsets into the > metadata topic (success determined by a marker record). Everything else > is still completely decoupled. The only add-on if for consumer of > intermediate topics -- they don't pick up the stop offsets at start up > but whenever they are available later on. > > > Does this make sense? > > > -Matthias > > On 11/30/16 2:40 AM, Eno Thereska wrote: >> Hi Matthias, >> >> I like the first part of the KIP. However, the second part with the failure >> modes and metadata topic is quite complex and I'm worried it doesn't solve >> the problems you mention under failure. For example, the application can >> fail before writing to the metadata topic. In that case, it is not clear >> what the second app instance should do (for the handling of intermediate >> topics case). So in general, we have the problem of failures during writes >> to the metadata topic itself. >> >> Also, for the intermediate topics example, I feel like we are trying to >> provide some sort of synchronisation between app instances with this >> approach. By default today such synchronisation does not exist. One >> instances writes to the intermediate topic, and the other reads from it, but >> only eventually. That is a nice way to decouple instances in my opinion. >> >> The user can always run the batch processing multiple times and eventually >> all instances will produce some output. The user's app can check whether the >> output size is satisfactory and then not run any further loops. So I feel >> they can already get a lot with the simpler first part of the KIP. >> >> Thanks >> Eno >> >> >>> On 30 Nov 2016, at 05:45, Matthias J. Sax <matth...@confluent.io> wrote: >>> >>> Thanks for your input. >>> >>> To clarify: the main reason to add the metadata topic is to cope with >>> subtopologies that are connected via intermediate topic (either >>> user-defined via through() or internally created for data repartitioning). >>> >>> Without this handling, the behavior would be odd and user experience >>> would be bad. >>> >>> Thus, using the metadata topic for have a "fixed HW" is just a small >>> add-on -- and more or less for free, because the metadata topic is >>> already there. >>> >>> >>> -Matthias >>> >>> >>> On 11/29/16 7:53 PM, Neha Narkhede wrote: >>>> Thanks for initiating this. I think this is a good first step towards >>>> unifying batch and stream processing in Kafka. >>>> >>>> I understood this capability to be simple yet very useful; it allows a >>>> Streams program to process a log, in batch, in arbitrary windows defined by >>>> the difference between the HW and the current offset. Basically, it >>>> provides a simple means for a Streams program to "stop" after processing a >>>> batch, stop (just like a batch program would) and continue where it left >>>> off when restarted. In other words, it allows batch processing behavior for >>>> a Streams app without code changes. >>>> >>>> This feature is useful but I do not think there is a necessity to add a >>>> metadata topic. After all, the user doesn't really care as much about >>>> exactly where the batch ends. This feature allows an app to "process as >>>> much as there is data to process" and the way it determines how much data >>>> there is to process is by reading the HW on startup. If there is new data >>>> written to the log right after it starts up, it will process it when >>>> restarted the next time. If it starts, reads HW but fails, it will restart >>>> and process a little more before it stops again. The fact that the HW >>>> changes in some scenarios isn't an issue since a batch program that behaves >>>> this way doesn't really care exactly what that HW is. >>>> >>>> There might be cases which require adding more topics but I would shy away >>>> from adding complexity wherever possible as it complicates operations and >>>> reduces simplicity. >>>> >>>> Other than this issue, I'm +1 on adding this feature. I think it is pretty >>>> powerful. >>>> >>>> >>>> On Mon, Nov 28, 2016 at 10:48 AM Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I want to start a discussion about KIP-95: >>>>> >>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams >>>>> >>>>> Looking forward to your feedback. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>>> -- >>>> Thanks, >>>> Neha >>>> >>> >> >