[
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17556070#comment-17556070
]
ATZMON CHEN TOV edited comment on KAFKA-8769 at 6/19/22 4:00 PM:
-----------------------------------------------------------------
Hi [~mjsax] ,
For us it would be very useful to have suppress per-key *in addition* to
suppression by grace on stream time. What I mean is that when a key arrives
for a window, it will cause eviction of the aggregation for the previous window
(of the same key). In case no key arrives for the 'next' window for a certain
key, the (existing) grace-on-stream-time will evict it.
The business benefit is that this will allow to avoid delaying unnecessarily
the entire traffic. I believe that this will benefit the common case, where
order per key *is* (at least effectively) guaranteed.
In our use case we do hourly aggregation and use grace of 10 minutes (due to
endpoint clock differences and multiple providers). With evict per-key we hope
to have large percent of our aggregations available for users less than 2
minutes from hour end.
In the API this can be added as boolean 'eager' parameter to suppress.
Alternatively, declaratively in streams-props as
'producer-guaranteed-order-per-key'.
was (Author: JIRAUSER291214):
Hi [~mjsax] ,
For us it would be very useful to have suppress per-key *in addition* to
suppression by grace on stream time. What I mean is that when a key arrives
for a window, it will cause eviction of the aggregation for the previous window
(of the same key). In case no key arrives for the 'next' window for a certain
key, the (existing) grace-on-stream-time will evict it.
The business benefit is that this will allow to avoid delaying unnecessarily
the entire traffic. I believe that this will benefit the common case, where
order per key *is* (at least effectively) guaranteed.
In our use case we do hourly aggregation and use grace of 10 minutes (due to
endpoint clock differences). With evict per-key we hope to have large percent
of our aggregations available for users less than 2 minutes from hour end.
In the API this can be added as boolean 'eager' parameter to suppress.
Alternatively, declaratively in streams-props as
'producer-guaranteed-order-per-key'.
> Consider computing stream time independently per key
> ----------------------------------------------------
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: John Roesler
> Priority: Major
> Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the
> highest timestamp observed by stateful operators, per partition. This concept
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT
> applications, it's common for sensors to save up quite a bit of data and then
> dump it all at once into the topic. See
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
> for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a
> time into the topic. This results in a pattern in which, when reading a
> single partition, the operators observe a lot of consecutive records for one
> key that increase in timestamp for 24 hours, then a bunch of consecutive
> records for another key that are also increasing in timestamp over the same
> 24 hour period. With our current stream-time definition, this means that the
> partition's stream time increases while reading the first key's data, but
> then stays paused while reading the second key's data, since the second batch
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required
> to set the grace period to the max expected time skew, for example 24 hours,
> or Streams will just drop the second key's data (since it is late). But, this
> means that if they want to use Suppression for "final results", they have to
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents
> a logically independent sequence of events. Tracking by partition is simply
> convenient, but typically not logically meaningful. That is, the partitions
> are just physically independent sequences of events, so it's convenient to
> track stream time at this granularity. It would be just as correct, and more
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the
> testing/low-traffic problem. This is the opposite issue, where a partition
> doesn't get enough traffic to advance stream time and results remain "stuck"
> in the suppression buffers. We can provide some mechanism to force the
> advancement of time across all partitions, for use in testing when you want
> to flush out all results, or in production when some topic is low volume. We
> shouldn't consider tracking time _more_ granularly until this problem is
> solved, since it would just make the low-traffic problem worse.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)