I'm planning to do aggregation over metrics, and when 'flush' happens, it emits an aggregation to the downstream (e.g. alarming)
Let say the first message saying some average number is very high and it triggers an alarm and later on user comes to the system and checks the number, it might have already been updated with normal value due to the late update. This is quite difficult to manage, all of downstream need to be stateful .. I'd like the alarming system to be stateless, I wonder how this should be handled .. -Kohki On Fri, Feb 24, 2017 at 2:39 PM, Matthias J. Sax <matth...@confluent.io> wrote: > First, I want to mention that you do no see "duplicate" -- you see late > updates. Kafka Streams embraces "change" and there is no such thing as a > final aggregate, but each agg output record is an update/refinement of > the result. > > Strict filtering of "late updates" is hard in Kafka Streams > > If you want to have such filtering, you would need to use > > aggregate(...).toStream().transform() > > with an attached state for transform() to implement this filter > manually. The state holds all emitted record per key. If a records > arrives, you check if its in the state of not. If not, you add it to the > state and emit it. If yes, you just drop the record. > > However, this will still not be perfect, because each time a commit is > triggered, the current window is flushed even if "stream time" did not > pass "window end" timestamp -- thus, the window is not completed yet. > > Thus, you would also need to consider current "stream time" that you can > indirectly access via .punctuate(). Thus, for incomplete windows you > might want to filter those "intermediate results" and not add to the > store. This is hard to get right (I am even not sure if it is possible > at all to get right). > > Even if this works however, this will only give you no duplicates (in > the strong sense of duplicate) as long as no error occurs. Kafka Streams > does not (yet) support exactly once processing and thus, in case of a > failure, you might get duplicate outputs. > > I am not sure what kind of alerting you are doing, but you should > remember if you did raise an alert in some other way, and if an late > update (or real duplicate) occurs don't alert a second time. > > Hope this helps. > > > > -Matthias > > > > On 2/24/17 2:16 PM, Jozef.koval wrote: > > Hi Kohki, > > > > Kafka streams windows use so called "segments" internally and their > retention time cannot be lower than some minimum. Your configuration is set > to less than this minimum, therefore is not accepted. Even Windows#until > javadoc specifies it: > > > > * Set the window maintain duration (retention time) in milliseconds. > > > > * This retention time is a guaranteed <i>lower bound</i> for how long a > window will be maintained. > > > > For more info consider reading [this](https://github.com/ > confluentinc/examples/issues/76) issue. > > > > Regards, Jozef > > > > > > Sent from [ProtonMail](https://protonmail.ch), encrypted email based in > Switzerland. > > > > > > > > -------- Original Message -------- > > Subject: Re: Immutable Record with Kafka Stream > > Local Time: February 24, 2017 7:11 PM > > UTC Time: February 24, 2017 7:11 PM > > From: tarop...@gmail.com > > To: users@kafka.apache.org > > > > Guozhang, thanks for the reply, but I'm having trouble understanding, > > here's the statement from the document > > > > Windowing operations are available in the Kafka Streams DSL > >> <http://docs.confluent.io/3.0.0/streams/developer-guide. > html#streams-developer-guide-dsl>, > >> where users can specify a *retention period* for the window. This allows > >> Kafka Streams to retain old window buckets for a period of time in > order to > >> wait for the late arrival of records whose timestamps fall within the > >> window interval. If a record arrives after the retention period has > passed, > >> the record cannot be processed and is dropped. > > > > > > And I believe I can set retention period by using 'until' > > > > TimeWindows.of(60000).until(60000) > > > > > > After receiving a data from (00:06:00), I don't know why it still > continue > > receiving data from time of 00:00:00, what is 'until' supposed to do ? > > > > Thanks > > -Kohki > > > > -- Kohki Nishio