In general, I like Bruno's proposal. Was also checking how Flink is
handling this case, and similar to Bruno's proposal, they have a
so-called "side output":
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

The only "concern" I have with Bruno's proposal is, that we would add
`KTable.lateRecords()` and currently not all KTables have late-records.
For example, a non-windowed aggregation or a `builder.table()` or a
table-table-join result. -- We could argue that this is not really an
issue as the "late record stream" would just be empty though.

(Side note: this might change in the future... I am just brainstorming
about a new KIP that proposes to add a "grace period" to all KTables...)

However, we would also need a similar "side output" for stream-stream
joins, and thus would need to add `KStream.lateRecords()` that again
would not produce any output for all existing stateless operations.

Maybe it would be still ok, but we should at least discuss/consider the
impact. It might be confusing for users. Not sure "how bad" it would be.


I agree that letting users use the Producer API to write late records
into a DLQ topic seems not desirable and that KafkaStream should handle
it automatically. -- In general, I also see you point about having
different DLQ topics instead of a centralized one. However, my train of
though was about other "corrupted" records that users might want to
"side output": for example, record that are dropped by the
`DeserializationExceptionnHandler` or record with null-key that are
dropped by join/aggregation operator (and a few others). It might be
good to have a unified solution for all those cases?


-Matthias


On 8/26/20 1:44 AM, Bruno Cadonna wrote:
> Hi Igor,
> 
> Thank you for your answers!
> 
> I think I understand your reasoning.
> 
> Looking at your proposal, some questions/comments arose.
> 
> 1. Who would be responsible for the topic to which the late records
> should be sent? Is it a topic users create and manage or is it an
> internal topic that is managed by Streams and that is deleted when the
> application is reset? This is not clear from your KIP.
> 
> 2. I guess you also need an overload for session windows and not only
> for time windows. Once KIP-450 is implemented, you would also need an
> overload for sliding windows.
> 
> 
> I also think that we should discuss some other approaches since this
> includes a change to Streams' public API and we should be judicious with
> such changes.
> 
> A. Adding an operator to the DSL that starts a stream for late records.
> Something like the following:
> 
> KTable aggregation =
>     builder.stream(...).groupByKey().windowedBy().count();
> 
> aggregation.toStream().to(...);
> aggregation.lateRecords().to(...);
> 
> That would conceptually branch an aggregation into a stream for the
> aggregation results and a stream for the late records. The advantage
> would be that you could use DSL operators on the late records stream.
> Additionally, it makes the code more self-described. I guess this
> approach goes also in the direction that Matthias mentioned regarding a
> built-in dead letter queue.
> 
> B. Adding a callback and providing the implementation that would write
> the late records to late records topic. That would address your concern
> about users being forced to provide the callback. That would solve your
> other concern about using the Producer API, but maybe we could also find
> a way to solve that.
> 
> Best,
> Bruno
> 
> 
> On 25.08.20 20:09, Igor Piddubnyi wrote:
>> Hi Matthias, Bruno,
>> Let me elaborate on my suggestions regarding late-tick handling.
>>
>>  > why you rejected the more general solution involving a callback
>> The main reasons why I tend to the topic approach is API-consistency
>> and cleanness of the user code.
>> The problem with a callback, in my opinion, is that users of the API
>> will be forced to define handling for each late-item using
>> procedural-code.
>> The same custom handling (statistics, db-insert, etc.) could be
>> achieved by consuming the topic.
>> I acknowledge that topic-approach introduces an overhead, compared to
>> the callback, however by paying this price users are getting all
>> stream-processing features.
>> Taking a look from the opposite side, assuming there is a callback,
>> and there is a need to persist data in the topic, one would have to
>> fall-back to producer-API to implement such handling. This would be
>> not so clean, from my point of view.
>>
>>  >I am wondering if we should try to do a built-in "dead-letter-queue"
>> feature that would be general purpose?
>> Generic DLQ might be not so bad idea, however there could be more than
>> one aggregation. Assuming DLQ is generic and contains messages with
>> other kinds of errors, API definitely needs an ability to distinguish
>> between messages with different types of errors.
>> This would be definitely a significant change. Taking into account
>> that this is my first experience with kafka-internals, I tried to keep
>> suggested change as small as possible.
>>
>>  > I am also wondering, if piping late records into a DLQ is the only
>> way to handle them
>> Definitely not, but in my opinion stream-api fits better for any
>> custom handling that user can define.
>> E.g. I don't see any problems defining another processing pipe, which
>> consumes from DLQ, does any necessary side effects, and then gets
>> merged anywhere.
>>
>>  >Can you elaborate on the use-case how a user would use the preserved
>> late records?
>> As just explained, I see this as another processing pipe, or just a
>> consumer, reading data from this topic and doing any necessary handling.
>> This might happen even in another service, if required by the logic.
>> Docs probably should be updated with respective examples.
>>
>> E.g. in the system I'm working on, such handling would involve complex
>> data-correction in the database and would be executed on a separate
>> instance.
>> My arguments and answers might be quite biased, because I mostly
>> consider this use-case for the application currently being developed.
>> Please share your opinion and feedback.
>>
>> Regards, Igor.
>>
>> On 11.08.20 09:25, Bruno Cadonna wrote:
>>> Hi Igor,
>>>
>>> Thanks for the KIP!
>>>
>>> Similar to Matthias, I am also wondering why you rejected the more
>>> general solution involving a callback. I also think that writing to a
>>> topic is just one of multiple ways to handle late records. For
>>> example, one could compute statistics over the late records before or
>>> instead writing the records to a topic. Or it could write the records
>>> to a database to analyse.
>>>
>>> Best,
>>> Bruno
>>>
>>> On 28.07.20 05:14, Matthias J. Sax wrote:
>>>> Thanks for the KIP Igor.
>>>>
>>>> What you propose sounds a little bit like a "dead-letter-queue"
>>>> pattern.
>>>> Thus, I am wondering if we should try to do a built-in
>>>> "dead-letter-queue" feature that would be general purpose? For example,
>>>> uses can drop message in the source node if they don't have a valid
>>>> timestamp or if a deserialization error occurs and face a similar issue
>>>> for those cases (even if it might be a little simpler to handle those
>>>> cases, as custom user code is executed).
>>>>
>>>> For a general purpose DLQ, the feature should be expose at the
>>>> Processor
>>>> API level though, and the DSL would just use this feature (instead of
>>>> introducing it as a DSL feature).
>>>>
>>>> Late records are of course only defined at the DSL level as for the
>>>> PAPI
>>>> users need to define custom semantics. Also, late records are not
>>>> really
>>>> corrupted. However, the pattern seems similar enough, ie, piping late
>>>> data into a topic is just a special case for a DLQ?
>>>>
>>>> I am also wondering, if piping late records into a DLQ is the only way
>>>> to handle them? For example, I could imagine that a user just wants to
>>>> trigger a side-effect (similar to what you mention in rejected
>>>> alternatives)? Or maybe a user might even want to somehow process those
>>>> record and feed them back into the actually processing pipeline.
>>>>
>>>> Last, a DLQ is only useful if somebody consumes from the topic and does
>>>> something with the data. Can you elaborate on the use-case how a user
>>>> would use the preserved late records?
>>>>
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 7/27/20 1:45 AM, Igor Piddubnyi wrote:
>>>>> Hi everybody,
>>>>> I would like to start off the discussion for KIP-647:
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-647%3A+Add+ability+to+handle+late+messages+in+streams-aggregation
>>>>>
>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-594%3A+Expose+output+topic+names+from+TopologyTestDriver>
>>>>>
>>>>>
>>>>>
>>>>> This KIP proposes a minor adjustment in the kafka-streams
>>>>> aggregation-api, adding an ability for processing late messages.
>>>>> [WIP] PR here:https://github.com/apache/kafka/pull/9017
>>>>>
>>>>> Please check.
>>>>> Regards, Igor.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to