Thanks for the KIP Igor. What you propose sounds a little bit like a "dead-letter-queue" pattern. Thus, I am wondering if we should try to do a built-in "dead-letter-queue" feature that would be general purpose? For example, uses can drop message in the source node if they don't have a valid timestamp or if a deserialization error occurs and face a similar issue for those cases (even if it might be a little simpler to handle those cases, as custom user code is executed).
For a general purpose DLQ, the feature should be expose at the Processor API level though, and the DSL would just use this feature (instead of introducing it as a DSL feature). Late records are of course only defined at the DSL level as for the PAPI users need to define custom semantics. Also, late records are not really corrupted. However, the pattern seems similar enough, ie, piping late data into a topic is just a special case for a DLQ? I am also wondering, if piping late records into a DLQ is the only way to handle them? For example, I could imagine that a user just wants to trigger a side-effect (similar to what you mention in rejected alternatives)? Or maybe a user might even want to somehow process those record and feed them back into the actually processing pipeline. Last, a DLQ is only useful if somebody consumes from the topic and does something with the data. Can you elaborate on the use-case how a user would use the preserved late records? -Matthias On 7/27/20 1:45 AM, Igor Piddubnyi wrote: > Hi everybody, > I would like to start off the discussion for KIP-647: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-647%3A+Add+ability+to+handle+late+messages+in+streams-aggregation > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-594%3A+Expose+output+topic+names+from+TopologyTestDriver> > > > This KIP proposes a minor adjustment in the kafka-streams > aggregation-api, adding an ability for processing late messages. > [WIP] PR here:https://github.com/apache/kafka/pull/9017 > > Please check. > Regards, Igor. > > > >
signature.asc
Description: OpenPGP digital signature