Hi Igor,
Thank you for your answers!
I think I understand your reasoning.
Looking at your proposal, some questions/comments arose.
1. Who would be responsible for the topic to which the late records
should be sent? Is it a topic users create and manage or is it an
internal topic that is managed by Streams and that is deleted when the
application is reset? This is not clear from your KIP.
2. I guess you also need an overload for session windows and not only
for time windows. Once KIP-450 is implemented, you would also need an
overload for sliding windows.
I also think that we should discuss some other approaches since this
includes a change to Streams' public API and we should be judicious with
such changes.
A. Adding an operator to the DSL that starts a stream for late records.
Something like the following:
KTable aggregation =
builder.stream(...).groupByKey().windowedBy().count();
aggregation.toStream().to(...);
aggregation.lateRecords().to(...);
That would conceptually branch an aggregation into a stream for the
aggregation results and a stream for the late records. The advantage
would be that you could use DSL operators on the late records stream.
Additionally, it makes the code more self-described. I guess this
approach goes also in the direction that Matthias mentioned regarding a
built-in dead letter queue.
B. Adding a callback and providing the implementation that would write
the late records to late records topic. That would address your concern
about users being forced to provide the callback. That would solve your
other concern about using the Producer API, but maybe we could also find
a way to solve that.
Best,
Bruno
On 25.08.20 20:09, Igor Piddubnyi wrote:
Hi Matthias, Bruno,
Let me elaborate on my suggestions regarding late-tick handling.
> why you rejected the more general solution involving a callback
The main reasons why I tend to the topic approach is API-consistency and
cleanness of the user code.
The problem with a callback, in my opinion, is that users of the API
will be forced to define handling for each late-item using procedural-code.
The same custom handling (statistics, db-insert, etc.) could be achieved
by consuming the topic.
I acknowledge that topic-approach introduces an overhead, compared to
the callback, however by paying this price users are getting all
stream-processing features.
Taking a look from the opposite side, assuming there is a callback, and
there is a need to persist data in the topic, one would have to
fall-back to producer-API to implement such handling. This would be not
so clean, from my point of view.
>I am wondering if we should try to do a built-in "dead-letter-queue"
feature that would be general purpose?
Generic DLQ might be not so bad idea, however there could be more than
one aggregation. Assuming DLQ is generic and contains messages with
other kinds of errors, API definitely needs an ability to distinguish
between messages with different types of errors.
This would be definitely a significant change. Taking into account that
this is my first experience with kafka-internals, I tried to keep
suggested change as small as possible.
> I am also wondering, if piping late records into a DLQ is the only
way to handle them
Definitely not, but in my opinion stream-api fits better for any custom
handling that user can define.
E.g. I don't see any problems defining another processing pipe, which
consumes from DLQ, does any necessary side effects, and then gets merged
anywhere.
>Can you elaborate on the use-case how a user would use the preserved
late records?
As just explained, I see this as another processing pipe, or just a
consumer, reading data from this topic and doing any necessary handling.
This might happen even in another service, if required by the logic.
Docs probably should be updated with respective examples.
E.g. in the system I'm working on, such handling would involve complex
data-correction in the database and would be executed on a separate
instance.
My arguments and answers might be quite biased, because I mostly
consider this use-case for the application currently being developed.
Please share your opinion and feedback.
Regards, Igor.
On 11.08.20 09:25, Bruno Cadonna wrote:
Hi Igor,
Thanks for the KIP!
Similar to Matthias, I am also wondering why you rejected the more
general solution involving a callback. I also think that writing to a
topic is just one of multiple ways to handle late records. For
example, one could compute statistics over the late records before or
instead writing the records to a topic. Or it could write the records
to a database to analyse.
Best,
Bruno
On 28.07.20 05:14, Matthias J. Sax wrote:
Thanks for the KIP Igor.
What you propose sounds a little bit like a "dead-letter-queue" pattern.
Thus, I am wondering if we should try to do a built-in
"dead-letter-queue" feature that would be general purpose? For example,
uses can drop message in the source node if they don't have a valid
timestamp or if a deserialization error occurs and face a similar issue
for those cases (even if it might be a little simpler to handle those
cases, as custom user code is executed).
For a general purpose DLQ, the feature should be expose at the Processor
API level though, and the DSL would just use this feature (instead of
introducing it as a DSL feature).
Late records are of course only defined at the DSL level as for the PAPI
users need to define custom semantics. Also, late records are not really
corrupted. However, the pattern seems similar enough, ie, piping late
data into a topic is just a special case for a DLQ?
I am also wondering, if piping late records into a DLQ is the only way
to handle them? For example, I could imagine that a user just wants to
trigger a side-effect (similar to what you mention in rejected
alternatives)? Or maybe a user might even want to somehow process those
record and feed them back into the actually processing pipeline.
Last, a DLQ is only useful if somebody consumes from the topic and does
something with the data. Can you elaborate on the use-case how a user
would use the preserved late records?
-Matthias
On 7/27/20 1:45 AM, Igor Piddubnyi wrote:
Hi everybody,
I would like to start off the discussion for KIP-647:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-647%3A+Add+ability+to+handle+late+messages+in+streams-aggregation
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-594%3A+Expose+output+topic+names+from+TopologyTestDriver>
This KIP proposes a minor adjustment in the kafka-streams
aggregation-api, adding an ability for processing late messages.
[WIP] PR here:https://github.com/apache/kafka/pull/9017
Please check.
Regards, Igor.