Hey guys,

One thing I've always found super important for this kind of design work is
to do a really good job of cataloging the landscape of use cases and how
prevalent each one is. By that I mean not just listing lots of uses, but
also grouping them into categories that functionally need the same thing.
In the absence of this it is very hard to reason about design proposals.
>From the proposals so far I think we have a lot of discussion around
possible apis, but less around what the user needs for different use cases
and how they would implement that using the api.

Here is an example:
You aggregate click and impression data for a reddit like site. Every ten
minutes you want to output a ranked list of the top 10 articles ranked by
clicks/impressions for each geographical area. I want to be able run this
in steady state as well as rerun to regenerate results (or catch up if it
crashes).

There are a couple of tricky things that seem to make this hard with either
of the options proposed:
1. If I emit this data using event time I have the problem described where
a geographical region with no new clicks or impressions will fail to output
results.
2. If I emit this data using system time I have the problem that when
reprocessing data my window may not be ten minutes but 10 hours if my
processing is very fast so it dramatically changes the output.

Maybe a hybrid solution works: I window by event time but trigger results
by system time for windows that have updated? Not really sure the details
of making that work. Does that work? Are there concrete examples where you
actually want the current behavior?

-Jay


On Tue, Apr 4, 2017 at 8:32 PM, Arun Mathew <arunmathe...@gmail.com> wrote:

> Hi All,
>
> Thanks for the KIP. We were also in need of a mechanism to trigger
> punctuate in the absence of events.
>
> As I described in [
> https://issues.apache.org/jira/browse/KAFKA-3514?
> focusedCommentId=15926036&page=com.atlassian.jira.
> plugin.system.issuetabpanels:comment-tabpanel#comment-15926036
> ],
>
>    - Our approached involved using the event time by default.
>    - The method to check if there is any punctuate ready in the
>    PunctuationQueue is triggered via the any event received by the stream
>    tread, or at the polling intervals in the absence of any events.
>    - When we create Punctuate objects (which contains the next event time
>    for punctuation and interval), we also record the creation time (system
>    time).
>    - While checking for maturity of Punctuate Schedule by mayBePunctuate
>    method, we also check if the system clock has elapsed the punctuate
>    interval since the schedule creation time.
>    - In the absence of any event, or in the absence of any event for one
>    topic in the partition group assigned to the stream task, the system
> time
>    will elapse the interval and we trigger a punctuate using the expected
>    punctuation event time.
>    - we then create the next punctuation schedule as punctuation event time
>    + punctuation interval, [again recording the system time of creation of
> the
>    schedule].
>
> We call this a Hybrid Punctuate. Of course, this approach has pros and
> cons.
> Pros
>
>    - Punctuates will happen in <punctuate interval> time duration at max in
>    terms of system time.
>    - The semantics as a whole continues to revolve around event time.
>    - We can use the old data [old timestamps] to rerun any experiments or
>    tests.
>
> Cons
>
>    - In case the  <punctuate interval> is not a time duration [say logical
>    time/event count], then the approach might not be meaningful.
>    - In case there is a case where we have to wait for an actual event from
>    a low event rate partition in the partition group, this approach will
> jump
>    the gun.
>    - in case the event processing cannot catch up with the event rate and
>    the expected timestamp events gets queued for long time, this approach
>    might jump the gun.
>
> I believe the above approach and discussion goes close to the approach A.
>
> -----------
>
> I like the idea of having an even count based punctuate.
>
> -----------
>
> I agree with the discussion around approach C, that we should provide the
> user with the option to choose system time or event time based punctuates.
> But I believe that the user predominantly wants to use event time while not
> missing out on regular punctuates due to event delays or event absences.
> Hence a complex punctuate option as Matthias mentioned (quoted below) would
> be most apt.
>
> "- We might want to add "complex" schedules later on (like, punctuate on
> every 10 seconds event-time or 60 seconds system-time whatever comes
> first)."
>
> -----------
>
> I think I read somewhere that Kafka Streams started with System Time as the
> punctuation standard, but was later changed to Event Time. I guess there
> would be some good reason behind it. As Kafka Streams want to evolve more
> on the Stream Processing front, I believe the emphasis on event time would
> remain quite strong.
>
>
> With Regards,
>
> Arun Mathew
> Yahoo! JAPAN Corporation, Tokyo
>
>
> On Wed, Apr 5, 2017 at 3:53 AM, Thomas Becker <tobec...@tivo.com> wrote:
>
> > Yeah I like PuncutationType much better; I just threw Time out there
> > more as a strawman than an actual suggestion ;) I still think it's
> > worth considering what this buys us over an additional callback. I
> > foresee a number of punctuate implementations following this pattern:
> >
> > public void punctuate(PunctuationType type) {
> >     switch (type) {
> >         case EVENT_TIME:
> >             methodA();
> >             break;
> >         case SYSTEM_TIME:
> >             methodB();
> >             break;
> >     }
> > }
> >
> > I guess one advantage of this approach is we could add additional
> > punctuation types later in a backwards compatible way (like event count
> > as you mentioned).
> >
> > -Tommy
> >
> >
> > On Tue, 2017-04-04 at 11:10 -0700, Matthias J. Sax wrote:
> > > That sounds promising.
> > >
> > > I am just wondering if `Time` is the best name. Maybe we want to add
> > > other non-time based punctuations at some point later. I would
> > > suggest
> > >
> > > enum PunctuationType {
> > >   EVENT_TIME,
> > >   SYSTEM_TIME,
> > > }
> > >
> > > or similar. Just to keep the door open -- it's easier to add new
> > > stuff
> > > if the name is more generic.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 4/4/17 5:30 AM, Thomas Becker wrote:
> > > >
> > > > I agree that the framework providing and managing the notion of
> > > > stream
> > > > time is valuable and not something we would want to delegate to the
> > > > tasks. I'm not entirely convinced that a separate callback (option
> > > > C)
> > > > is that messy (it could just be a default method with an empty
> > > > implementation), but if we wanted a single API to handle both
> > > > cases,
> > > > how about something like the following?
> > > >
> > > > enum Time {
> > > >    STREAM,
> > > >    CLOCK
> > > > }
> > > >
> > > > Then on ProcessorContext:
> > > > context.schedule(Time time, long interval)  // We could allow this
> > > > to
> > > > be called once for each value of time to mix approaches.
> > > >
> > > > Then the Processor API becomes:
> > > > punctuate(Time time) // time here denotes which schedule resulted
> > > > in
> > > > this call.
> > > >
> > > > Thoughts?
> > > >
> > > >
> > > > On Mon, 2017-04-03 at 22:44 -0700, Matthias J. Sax wrote:
> > > > >
> > > > > Thanks a lot for the KIP Michal,
> > > > >
> > > > > I was thinking about the four options you proposed in more
> > > > > details
> > > > > and
> > > > > this are my thoughts:
> > > > >
> > > > > (A) You argue, that users can still "punctuate" on event-time via
> > > > > process(), but I am not sure if this is possible. Note, that
> > > > > users
> > > > > only
> > > > > get record timestamps via context.timestamp(). Thus, users would
> > > > > need
> > > > > to
> > > > > track the time progress per partition (based on the partitions
> > > > > they
> > > > > obverse via context.partition(). (This alone puts a huge burden
> > > > > on
> > > > > the
> > > > > user by itself.) However, users are not notified at startup what
> > > > > partitions are assigned, and user are not notified when
> > > > > partitions
> > > > > get
> > > > > revoked. Because this information is not available, it's not
> > > > > possible
> > > > > to
> > > > > "manually advance" stream-time, and thus event-time punctuation
> > > > > within
> > > > > process() seems not to be possible -- or do you see a way to get
> > > > > it
> > > > > done? And even if, it might still be too clumsy to use.
> > > > >
> > > > > (B) This does not allow to mix both approaches, thus limiting
> > > > > what
> > > > > users
> > > > > can do.
> > > > >
> > > > > (C) This should give all flexibility we need. However, just
> > > > > adding
> > > > > one
> > > > > more method seems to be a solution that is too simple (cf my
> > > > > comments
> > > > > below).
> > > > >
> > > > > (D) This might be hard to use. Also, I am not sure how a user
> > > > > could
> > > > > enable system-time and event-time punctuation in parallel.
> > > > >
> > > > >
> > > > >
> > > > > Overall options (C) seems to be the most promising approach to
> > > > > me.
> > > > > Because I also favor a clean API, we might keep current
> > > > > punctuate()
> > > > > as-is, but deprecate it -- so we can remove it at some later
> > > > > point
> > > > > when
> > > > > people use the "new punctuate API".
> > > > >
> > > > >
> > > > > Couple of follow up questions:
> > > > >
> > > > > - I am wondering, if we should have two callback methods or just
> > > > > one
> > > > > (ie, a unified for system and event time punctuation or one for
> > > > > each?).
> > > > >
> > > > > - If we have one, how can the user figure out, which condition
> > > > > did
> > > > > trigger?
> > > > >
> > > > > - How would the API look like, for registering different
> > > > > punctuate
> > > > > schedules? The "type" must be somehow defined?
> > > > >
> > > > > - We might want to add "complex" schedules later on (like,
> > > > > punctuate
> > > > > on
> > > > > every 10 seconds event-time or 60 seconds system-time whatever
> > > > > comes
> > > > > first). I don't say we should add this right away, but we might
> > > > > want
> > > > > to
> > > > > define the API in a way, that it allows extensions like this
> > > > > later
> > > > > on,
> > > > > without redesigning the API (ie, the API should be designed
> > > > > extensible)
> > > > >
> > > > > - Did you ever consider count-based punctuation?
> > > > >
> > > > >
> > > > > I understand, that you would like to solve a simple problem, but
> > > > > we
> > > > > learned from the past, that just "adding some API" quickly leads
> > > > > to a
> > > > > not very well defined API that needs time consuming clean up
> > > > > later on
> > > > > via other KIPs. Thus, I would prefer to get a holistic
> > > > > punctuation
> > > > > KIP
> > > > > with this from the beginning on to avoid later painful redesign.
> > > > >
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > >
> > > > > On 4/3/17 11:58 AM, Michal Borowiecki wrote:
> > > > > >
> > > > > >
> > > > > > Thanks Thomas,
> > > > > >
> > > > > > I'm also wary of changing the existing semantics of punctuate,
> > > > > > for
> > > > > > backward compatibility reasons, although I like the conceptual
> > > > > > simplicity of that option.
> > > > > >
> > > > > > Adding a new method to me feels safer but, in a way, uglier. I
> > > > > > added
> > > > > > this to the KIP now as option (C).
> > > > > >
> > > > > > The TimestampExtractor mechanism is actually more flexible, as
> > > > > > it
> > > > > > allows
> > > > > > you to return any value, you're not limited to event time or
> > > > > > system
> > > > > > time
> > > > > > (although I don't see an actual use case where you might need
> > > > > > anything
> > > > > > else then those two). Hence I also proposed the option to allow
> > > > > > users
> > > > > > to, effectively, decide what "stream time" is for them given
> > > > > > the
> > > > > > presence or absence of messages, much like they can decide what
> > > > > > msg
> > > > > > time
> > > > > > means for them using the TimestampExtractor. What do you think
> > > > > > about
> > > > > > that? This is probably most flexible but also most complicated.
> > > > > >
> > > > > > All comments appreciated.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Michal
> > > > > >
> > > > > >
> > > > > > On 03/04/17 19:23, Thomas Becker wrote:
> > > > > > >
> > > > > > >
> > > > > > > Although I fully agree we need a way to trigger periodic
> > > > > > > processing
> > > > > > > that is independent from whether and when messages arrive,
> > > > > > > I'm
> > > > > > > not sure
> > > > > > > I like the idea of changing the existing semantics across the
> > > > > > > board.
> > > > > > > What if we added an additional callback to Processor that can
> > > > > > > be
> > > > > > > scheduled similarly to punctuate() but was always called at
> > > > > > > fixed, wall
> > > > > > > clock based intervals? This way you wouldn't have to give up
> > > > > > > the
> > > > > > > notion
> > > > > > > of stream time to be able to do periodic processing.
> > > > > > >
> > > > > > > On Mon, 2017-04-03 at 10:34 +0100, Michal Borowiecki wrote:
> > > > > > > >
> > > > > > > >
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I have created a draft for KIP-138: Change punctuate
> > > > > > > > semantics
> > > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%
> > > > > > > > 3A+C
> > > > > > > > hange+
> > > > > > > > punctuate+semantics>
> > > > > > > > .
> > > > > > > >
> > > > > > > > Appreciating there can be different views on system-time vs
> > > > > > > > event-
> > > > > > > > time
> > > > > > > > semantics for punctuation depending on use-case and the
> > > > > > > > importance of
> > > > > > > > backwards compatibility of any such change, I've left it
> > > > > > > > quite
> > > > > > > > open
> > > > > > > > and
> > > > > > > > hope to fill in more info as the discussion progresses.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Michal
> > > > --
> > > >
> > > >
> > > >     Tommy Becker
> > > >
> > > >     Senior Software Engineer
> > > >
> > > >     O +1 919.460.4747
> > > >
> > > >     tivo.com
> > > >
> > > >
> > > > ________________________________
> > > >
> > > > This email and any attachments may contain confidential and
> > > > privileged material for the sole use of the intended recipient. Any
> > > > review, copying, or distribution of this email (or any attachments)
> > > > by others is prohibited. If you are not the intended recipient,
> > > > please contact the sender immediately and permanently delete this
> > > > email and any attachments. No employee or agent of TiVo Inc. is
> > > > authorized to conclude any binding agreement on behalf of TiVo Inc.
> > > > by email. Binding agreements with TiVo Inc. may only be made by a
> > > > signed written agreement.
> > > >
> > --
> >
> >
> >     Tommy Becker
> >
> >     Senior Software Engineer
> >
> >     O +1 919.460.4747
> >
> >     tivo.com
> >
> >
> > ________________________________
> >
> > This email and any attachments may contain confidential and privileged
> > material for the sole use of the intended recipient. Any review, copying,
> > or distribution of this email (or any attachments) by others is
> prohibited.
> > If you are not the intended recipient, please contact the sender
> > immediately and permanently delete this email and any attachments. No
> > employee or agent of TiVo Inc. is authorized to conclude any binding
> > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> > Inc. may only be made by a signed written agreement.
> >
>

Reply via email to