Kohki,

>From your use case it seems that you'd like to have an "explicit" trigger
on when aggregate data can be forwarded to down stream.

But before we dig deep into that feature, I'm wondering for your monitoring
cases, do you want to completely ignore any future updates when a record
has been sent to down stream, i.e. alarming? I could understand that you
want the alarming to be stateless, but not sure if you'd like to avoid
getting any more data once its "state has been set".

What I'd propose for your case, is to remember the data that triggers the
alarm, with some topology like:


KTable table = stream.aggregate(/* windowed aggregation */);

table.tostream.foreach(/* if there is a record indicating anomaly, report
it to alarming system with the value with the reporting time */);


----

So if you have an update stream from "table" as:

At time t0: window1 @ high value (should trigger alarm)
At time t1: window1 @ normal value (should not trigger alarm)

Then your "table1" would contain the final updated results, while your
alarm system can just contain a subset of its changelogs that kept all the
alarmed records, and can be interpret as:

"window1 was abnormal as reported at t0" ..


Guozhang


On Sun, Feb 26, 2017 at 10:29 AM, Kohki Nishio <tarop...@gmail.com> wrote:

> I'm planning to do aggregation over metrics, and when 'flush' happens, it
> emits an aggregation to the downstream (e.g. alarming)
>
> Let say the first message saying some average number is very high and it
> triggers an alarm and later on user comes to the system and checks the
> number, it might have already been updated with normal value due to the
> late update. This is quite difficult to manage, all of downstream need to
> be stateful .. I'd like the alarming system to be stateless, I wonder how
> this should be handled ..
>
> -Kohki
>
>
>
>
> On Fri, Feb 24, 2017 at 2:39 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > First, I want to mention that you do no see "duplicate" -- you see late
> > updates. Kafka Streams embraces "change" and there is no such thing as a
> > final aggregate, but each agg output record is an update/refinement of
> > the result.
> >
> > Strict filtering of "late updates" is hard in Kafka Streams
> >
> > If you want to have such filtering, you would need to use
> >
> > aggregate(...).toStream().transform()
> >
> > with an attached state for transform() to implement this filter
> > manually. The state holds all emitted record per key. If a records
> > arrives, you check if its in the state of not. If not, you add it to the
> > state and emit it. If yes, you just drop the record.
> >
> > However, this will still not be perfect, because each time a commit is
> > triggered, the current window is flushed even if "stream time" did not
> > pass "window end" timestamp -- thus, the window is not completed yet.
> >
> > Thus, you would also need to consider current "stream time" that you can
> > indirectly access via .punctuate(). Thus, for incomplete windows you
> > might want to filter those "intermediate results" and not add to the
> > store. This is hard to get right (I am even not sure if it is possible
> > at all to get right).
> >
> > Even if this works however, this will only give you no duplicates (in
> > the strong sense of duplicate) as long as no error occurs. Kafka Streams
> > does not (yet) support exactly once processing and thus, in case of a
> > failure, you might get duplicate outputs.
> >
> > I am not sure what kind of alerting you are doing, but you should
> > remember if you did raise an alert in some other way, and if an late
> > update (or real duplicate) occurs don't alert a second time.
> >
> > Hope this helps.
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 2/24/17 2:16 PM, Jozef.koval wrote:
> > > Hi Kohki,
> > >
> > > Kafka streams windows use so called "segments" internally and their
> > retention time cannot be lower than some minimum. Your configuration is
> set
> > to less than this minimum, therefore is not accepted. Even Windows#until
> > javadoc specifies it:
> > >
> > > * Set the window maintain duration (retention time) in milliseconds.
> > >
> > > * This retention time is a guaranteed <i>lower bound</i> for how long a
> > window will be maintained.
> > >
> > > For more info consider reading [this](https://github.com/
> > confluentinc/examples/issues/76) issue.
> > >
> > > Regards, Jozef
> > >
> > >
> > > Sent from [ProtonMail](https://protonmail.ch), encrypted email based
> in
> > Switzerland.
> > >
> > >
> > >
> > > -------- Original Message --------
> > > Subject: Re: Immutable Record with Kafka Stream
> > > Local Time: February 24, 2017 7:11 PM
> > > UTC Time: February 24, 2017 7:11 PM
> > > From: tarop...@gmail.com
> > > To: users@kafka.apache.org
> > >
> > > Guozhang, thanks for the reply, but I'm having trouble understanding,
> > > here's the statement from the document
> > >
> > > Windowing operations are available in the Kafka Streams DSL
> > >> <http://docs.confluent.io/3.0.0/streams/developer-guide.
> > html#streams-developer-guide-dsl>,
> > >> where users can specify a *retention period* for the window. This
> allows
> > >> Kafka Streams to retain old window buckets for a period of time in
> > order to
> > >> wait for the late arrival of records whose timestamps fall within the
> > >> window interval. If a record arrives after the retention period has
> > passed,
> > >> the record cannot be processed and is dropped.
> > >
> > >
> > > And I believe I can set retention period by using 'until'
> > >
> > > TimeWindows.of(60000).until(60000)
> > >
> > >
> > > After receiving a data from (00:06:00), I don't know why it still
> > continue
> > > receiving data from time of 00:00:00, what is 'until' supposed to do ?
> > >
> > > Thanks
> > > -Kohki
> > >
> >
> >
>
>
> --
> Kohki Nishio
>



-- 
-- Guozhang

Reply via email to