In general, I like Bruno's proposal. Was also checking how Flink is handling this case, and similar to Bruno's proposal, they have a so-called "side output": https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
The only "concern" I have with Bruno's proposal is, that we would add `KTable.lateRecords()` and currently not all KTables have late-records. For example, a non-windowed aggregation or a `builder.table()` or a table-table-join result. -- We could argue that this is not really an issue as the "late record stream" would just be empty though. (Side note: this might change in the future... I am just brainstorming about a new KIP that proposes to add a "grace period" to all KTables...) However, we would also need a similar "side output" for stream-stream joins, and thus would need to add `KStream.lateRecords()` that again would not produce any output for all existing stateless operations. Maybe it would be still ok, but we should at least discuss/consider the impact. It might be confusing for users. Not sure "how bad" it would be. I agree that letting users use the Producer API to write late records into a DLQ topic seems not desirable and that KafkaStream should handle it automatically. -- In general, I also see you point about having different DLQ topics instead of a centralized one. However, my train of though was about other "corrupted" records that users might want to "side output": for example, record that are dropped by the `DeserializationExceptionnHandler` or record with null-key that are dropped by join/aggregation operator (and a few others). It might be good to have a unified solution for all those cases? -Matthias On 8/26/20 1:44 AM, Bruno Cadonna wrote: > Hi Igor, > > Thank you for your answers! > > I think I understand your reasoning. > > Looking at your proposal, some questions/comments arose. > > 1. Who would be responsible for the topic to which the late records > should be sent? Is it a topic users create and manage or is it an > internal topic that is managed by Streams and that is deleted when the > application is reset? This is not clear from your KIP. > > 2. I guess you also need an overload for session windows and not only > for time windows. Once KIP-450 is implemented, you would also need an > overload for sliding windows. > > > I also think that we should discuss some other approaches since this > includes a change to Streams' public API and we should be judicious with > such changes. > > A. Adding an operator to the DSL that starts a stream for late records. > Something like the following: > > KTable aggregation = > builder.stream(...).groupByKey().windowedBy().count(); > > aggregation.toStream().to(...); > aggregation.lateRecords().to(...); > > That would conceptually branch an aggregation into a stream for the > aggregation results and a stream for the late records. The advantage > would be that you could use DSL operators on the late records stream. > Additionally, it makes the code more self-described. I guess this > approach goes also in the direction that Matthias mentioned regarding a > built-in dead letter queue. > > B. Adding a callback and providing the implementation that would write > the late records to late records topic. That would address your concern > about users being forced to provide the callback. That would solve your > other concern about using the Producer API, but maybe we could also find > a way to solve that. > > Best, > Bruno > > > On 25.08.20 20:09, Igor Piddubnyi wrote: >> Hi Matthias, Bruno, >> Let me elaborate on my suggestions regarding late-tick handling. >> >> > why you rejected the more general solution involving a callback >> The main reasons why I tend to the topic approach is API-consistency >> and cleanness of the user code. >> The problem with a callback, in my opinion, is that users of the API >> will be forced to define handling for each late-item using >> procedural-code. >> The same custom handling (statistics, db-insert, etc.) could be >> achieved by consuming the topic. >> I acknowledge that topic-approach introduces an overhead, compared to >> the callback, however by paying this price users are getting all >> stream-processing features. >> Taking a look from the opposite side, assuming there is a callback, >> and there is a need to persist data in the topic, one would have to >> fall-back to producer-API to implement such handling. This would be >> not so clean, from my point of view. >> >> >I am wondering if we should try to do a built-in "dead-letter-queue" >> feature that would be general purpose? >> Generic DLQ might be not so bad idea, however there could be more than >> one aggregation. Assuming DLQ is generic and contains messages with >> other kinds of errors, API definitely needs an ability to distinguish >> between messages with different types of errors. >> This would be definitely a significant change. Taking into account >> that this is my first experience with kafka-internals, I tried to keep >> suggested change as small as possible. >> >> > I am also wondering, if piping late records into a DLQ is the only >> way to handle them >> Definitely not, but in my opinion stream-api fits better for any >> custom handling that user can define. >> E.g. I don't see any problems defining another processing pipe, which >> consumes from DLQ, does any necessary side effects, and then gets >> merged anywhere. >> >> >Can you elaborate on the use-case how a user would use the preserved >> late records? >> As just explained, I see this as another processing pipe, or just a >> consumer, reading data from this topic and doing any necessary handling. >> This might happen even in another service, if required by the logic. >> Docs probably should be updated with respective examples. >> >> E.g. in the system I'm working on, such handling would involve complex >> data-correction in the database and would be executed on a separate >> instance. >> My arguments and answers might be quite biased, because I mostly >> consider this use-case for the application currently being developed. >> Please share your opinion and feedback. >> >> Regards, Igor. >> >> On 11.08.20 09:25, Bruno Cadonna wrote: >>> Hi Igor, >>> >>> Thanks for the KIP! >>> >>> Similar to Matthias, I am also wondering why you rejected the more >>> general solution involving a callback. I also think that writing to a >>> topic is just one of multiple ways to handle late records. For >>> example, one could compute statistics over the late records before or >>> instead writing the records to a topic. Or it could write the records >>> to a database to analyse. >>> >>> Best, >>> Bruno >>> >>> On 28.07.20 05:14, Matthias J. Sax wrote: >>>> Thanks for the KIP Igor. >>>> >>>> What you propose sounds a little bit like a "dead-letter-queue" >>>> pattern. >>>> Thus, I am wondering if we should try to do a built-in >>>> "dead-letter-queue" feature that would be general purpose? For example, >>>> uses can drop message in the source node if they don't have a valid >>>> timestamp or if a deserialization error occurs and face a similar issue >>>> for those cases (even if it might be a little simpler to handle those >>>> cases, as custom user code is executed). >>>> >>>> For a general purpose DLQ, the feature should be expose at the >>>> Processor >>>> API level though, and the DSL would just use this feature (instead of >>>> introducing it as a DSL feature). >>>> >>>> Late records are of course only defined at the DSL level as for the >>>> PAPI >>>> users need to define custom semantics. Also, late records are not >>>> really >>>> corrupted. However, the pattern seems similar enough, ie, piping late >>>> data into a topic is just a special case for a DLQ? >>>> >>>> I am also wondering, if piping late records into a DLQ is the only way >>>> to handle them? For example, I could imagine that a user just wants to >>>> trigger a side-effect (similar to what you mention in rejected >>>> alternatives)? Or maybe a user might even want to somehow process those >>>> record and feed them back into the actually processing pipeline. >>>> >>>> Last, a DLQ is only useful if somebody consumes from the topic and does >>>> something with the data. Can you elaborate on the use-case how a user >>>> would use the preserved late records? >>>> >>>> >>>> >>>> -Matthias >>>> >>>> On 7/27/20 1:45 AM, Igor Piddubnyi wrote: >>>>> Hi everybody, >>>>> I would like to start off the discussion for KIP-647: >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-647%3A+Add+ability+to+handle+late+messages+in+streams-aggregation >>>>> >>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-594%3A+Expose+output+topic+names+from+TopologyTestDriver> >>>>> >>>>> >>>>> >>>>> This KIP proposes a minor adjustment in the kafka-streams >>>>> aggregation-api, adding an ability for processing late messages. >>>>> [WIP] PR here:https://github.com/apache/kafka/pull/9017 >>>>> >>>>> Please check. >>>>> Regards, Igor. >>>>> >>>>> >>>>> >>>>> >>>>
signature.asc
Description: OpenPGP digital signature