I agree that there are multiple ways how to avoid calling `System.currentTimeMillis()`. However, the KIP needs to define the public contract to explain users what behavior they can expect (the simplest thing might be to say, it's based on `punctuation()` schedule -- not sure if this is desired or not).
Similarly, the question about "why should we slush on shutdown" is part of the contract and multiple ways how to design it seem possible. -Matthias On 3/11/19 8:30 AM, John Roesler wrote: > Hey, all, just chiming in to keep the discussion moving... > > Regarding whether to flush or not on shutdown, I'm curious why we *would* > flush... > The record cache does this, but that's because it's not durable. The > suppression buffer is already backed by a changelog specifically so that it > can provide exactly the timing you configure, and not have to emit early > just because the commit interval is short or the task is migrated. So, > regardless of the commit interval or application lifecycle, if I tell > suppression to wait 5 minutes before emitting, it'll wait 5 minutes. It > seems asymmetric for wall-clock suppression to behave differently. > > Regarding checking wall-clock time, yes, it can be expensive, but there are > a number of ways we can cope with it without introducing a complicated > algorithm: > * use nano time > * check the wall-clock once per batch and set it on the processor context > in org.apache.kafka.streams.processor.internals.StreamThread#runOnce (we > already check system time here anyway) > * maybe just do the naive thing and measure the overhead. I.e., maybe we > should benchmark the implementation anyway to look for this or other > bottlenecks, and fix performance problem in the order they appear. > > Thoughts? > > Thanks, > -John > > On Mon, Feb 25, 2019 at 4:36 PM jonathangor...@newrelic.com < > jonathangor...@newrelic.com> wrote: > >> On 2019/02/21 02:19:27, "Matthias J. Sax" <matth...@confluent.io> wrote: >>> thanks for the KIP. Corner case question: >>> >>> What happens if an application is stopped an restarted? >>> >>> - Should suppress() flush all records (would be _before_ the time >> elapsed)? >>> - Or should it preserve buffered records and reload on restart? For >>> this case, should the record be flushed on reload (elapsed time is >>> unknown) or should we reset the timer to zero? >> >> My opinion is that we should aim for simplicity for the first >> implementation of this feature: Flush all the records on shutdown. If >> there's demand in the future for strict adherence on shutdown we can >> implement them as extra params to Suppressed api. >> >>> What is unclear to me atm, is the use-case you anticipate. If you assume >>> a live run of an applications, event-time and processing-time should be >>> fairly identical (at least with regard to data rates). Thus, suppress() >>> on event-time should give you about the same behavior as wall-clock >>> time? If you disagree, can you elaborate? >> >> Imagine a session window where you aggregate 10K events that usually occur >> within 2-3 seconds of each other (event time). However, they are ingested >> in batches of 1000 or so, spread out over 2-3 minutes (ingest time), and >> not necessarily in order. It's important for us to be able to publish this >> aggregate in real-time as we get new data (every 10 seconds) to keep our >> time to glass low, but our data store is non-updateable so we'd like to >> limit the number of aggregates we publish. >> >> If you imagine a case where all the event batches arrive in reverse order >> for one particular session window, then once the stream time advances past >> the suppression threshold, we could publish an aggregate update for each >> newly received event. >> >>> This leave the case for data reprocessing, for which event-time advances >>> much faster than wall-clock time. Is this the target use-case? >> >> No, see above. >> >>> About the implementation: checking wall-clock time is an expensive >>> system call, so I am little worried about run-time overhead. This seems >>> not to be an implementation detail and thus, it might be worth to >>> includes is in the discussion. The question is, how strict the guarantee >>> when records should be flushed should be. Assume you set a timer of 1 >>> seconds, and you have a data rate of 1000 records per second, with each >>> record arriving one ms after the other all each with different key. To >>> flush this data "correctly" we would need to check wall-clock time very >>> millisecond... Thoughts? >>> >>> (We don't need to dive into all details, but a high level discussion >>> about the desired algorithm and guarantees would be good to have IMHO.) >> >> I had never dug into the performance characteristics of >> currentTimeMillis() before: >> >> http://pzemtsov.github.io/2017/07/23/the-slow-currenttimemillis.html >> >> So if we assume the slow Linux average of 640 ns/call, at a 1000 calls/sec >> that's 0.64 ms. Doesn't seem terrible to me but I imagine for some use >> cases 1M calls/sec might be reasonable and now we're up to 0.64s just in >> system time checking. Perhaps we add some logic that calculates the rate of >> data input and if it exceeds some threshold we only check the time every n >> records? The trick there I suppose is for very bursty traffic you could >> exceed and then wait too long to trigger another check. Maybe we store a >> moving average? Or perhaps this is getting too complicated? >> >> >> >
signature.asc
Description: OpenPGP digital signature