Hi Jay,

The hybrid solution is exactly what I expect and need for our use cases
when dealing with telecom data.

Thanks
Tianji

On Wed, Apr 5, 2017 at 12:01 AM, Jay Kreps <j...@confluent.io> wrote:

> Hey guys,
>
> One thing I've always found super important for this kind of design work is
> to do a really good job of cataloging the landscape of use cases and how
> prevalent each one is. By that I mean not just listing lots of uses, but
> also grouping them into categories that functionally need the same thing.
> In the absence of this it is very hard to reason about design proposals.
> From the proposals so far I think we have a lot of discussion around
> possible apis, but less around what the user needs for different use cases
> and how they would implement that using the api.
>
> Here is an example:
> You aggregate click and impression data for a reddit like site. Every ten
> minutes you want to output a ranked list of the top 10 articles ranked by
> clicks/impressions for each geographical area. I want to be able run this
> in steady state as well as rerun to regenerate results (or catch up if it
> crashes).
>
> There are a couple of tricky things that seem to make this hard with either
> of the options proposed:
> 1. If I emit this data using event time I have the problem described where
> a geographical region with no new clicks or impressions will fail to output
> results.
> 2. If I emit this data using system time I have the problem that when
> reprocessing data my window may not be ten minutes but 10 hours if my
> processing is very fast so it dramatically changes the output.
>
> Maybe a hybrid solution works: I window by event time but trigger results
> by system time for windows that have updated? Not really sure the details
> of making that work. Does that work? Are there concrete examples where you
> actually want the current behavior?
>
> -Jay
>
>
> On Tue, Apr 4, 2017 at 8:32 PM, Arun Mathew <arunmathe...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > Thanks for the KIP. We were also in need of a mechanism to trigger
> > punctuate in the absence of events.
> >
> > As I described in [
> > https://issues.apache.org/jira/browse/KAFKA-3514?
> > focusedCommentId=15926036&page=com.atlassian.jira.
> > plugin.system.issuetabpanels:comment-tabpanel#comment-15926036
> > ],
> >
> >    - Our approached involved using the event time by default.
> >    - The method to check if there is any punctuate ready in the
> >    PunctuationQueue is triggered via the any event received by the stream
> >    tread, or at the polling intervals in the absence of any events.
> >    - When we create Punctuate objects (which contains the next event time
> >    for punctuation and interval), we also record the creation time
> (system
> >    time).
> >    - While checking for maturity of Punctuate Schedule by mayBePunctuate
> >    method, we also check if the system clock has elapsed the punctuate
> >    interval since the schedule creation time.
> >    - In the absence of any event, or in the absence of any event for one
> >    topic in the partition group assigned to the stream task, the system
> > time
> >    will elapse the interval and we trigger a punctuate using the expected
> >    punctuation event time.
> >    - we then create the next punctuation schedule as punctuation event
> time
> >    + punctuation interval, [again recording the system time of creation
> of
> > the
> >    schedule].
> >
> > We call this a Hybrid Punctuate. Of course, this approach has pros and
> > cons.
> > Pros
> >
> >    - Punctuates will happen in <punctuate interval> time duration at max
> in
> >    terms of system time.
> >    - The semantics as a whole continues to revolve around event time.
> >    - We can use the old data [old timestamps] to rerun any experiments or
> >    tests.
> >
> > Cons
> >
> >    - In case the  <punctuate interval> is not a time duration [say
> logical
> >    time/event count], then the approach might not be meaningful.
> >    - In case there is a case where we have to wait for an actual event
> from
> >    a low event rate partition in the partition group, this approach will
> > jump
> >    the gun.
> >    - in case the event processing cannot catch up with the event rate and
> >    the expected timestamp events gets queued for long time, this approach
> >    might jump the gun.
> >
> > I believe the above approach and discussion goes close to the approach A.
> >
> > -----------
> >
> > I like the idea of having an even count based punctuate.
> >
> > -----------
> >
> > I agree with the discussion around approach C, that we should provide the
> > user with the option to choose system time or event time based
> punctuates.
> > But I believe that the user predominantly wants to use event time while
> not
> > missing out on regular punctuates due to event delays or event absences.
> > Hence a complex punctuate option as Matthias mentioned (quoted below)
> would
> > be most apt.
> >
> > "- We might want to add "complex" schedules later on (like, punctuate on
> > every 10 seconds event-time or 60 seconds system-time whatever comes
> > first)."
> >
> > -----------
> >
> > I think I read somewhere that Kafka Streams started with System Time as
> the
> > punctuation standard, but was later changed to Event Time. I guess there
> > would be some good reason behind it. As Kafka Streams want to evolve more
> > on the Stream Processing front, I believe the emphasis on event time
> would
> > remain quite strong.
> >
> >
> > With Regards,
> >
> > Arun Mathew
> > Yahoo! JAPAN Corporation, Tokyo
> >
> >
> > On Wed, Apr 5, 2017 at 3:53 AM, Thomas Becker <tobec...@tivo.com> wrote:
> >
> > > Yeah I like PuncutationType much better; I just threw Time out there
> > > more as a strawman than an actual suggestion ;) I still think it's
> > > worth considering what this buys us over an additional callback. I
> > > foresee a number of punctuate implementations following this pattern:
> > >
> > > public void punctuate(PunctuationType type) {
> > >     switch (type) {
> > >         case EVENT_TIME:
> > >             methodA();
> > >             break;
> > >         case SYSTEM_TIME:
> > >             methodB();
> > >             break;
> > >     }
> > > }
> > >
> > > I guess one advantage of this approach is we could add additional
> > > punctuation types later in a backwards compatible way (like event count
> > > as you mentioned).
> > >
> > > -Tommy
> > >
> > >
> > > On Tue, 2017-04-04 at 11:10 -0700, Matthias J. Sax wrote:
> > > > That sounds promising.
> > > >
> > > > I am just wondering if `Time` is the best name. Maybe we want to add
> > > > other non-time based punctuations at some point later. I would
> > > > suggest
> > > >
> > > > enum PunctuationType {
> > > >   EVENT_TIME,
> > > >   SYSTEM_TIME,
> > > > }
> > > >
> > > > or similar. Just to keep the door open -- it's easier to add new
> > > > stuff
> > > > if the name is more generic.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 4/4/17 5:30 AM, Thomas Becker wrote:
> > > > >
> > > > > I agree that the framework providing and managing the notion of
> > > > > stream
> > > > > time is valuable and not something we would want to delegate to the
> > > > > tasks. I'm not entirely convinced that a separate callback (option
> > > > > C)
> > > > > is that messy (it could just be a default method with an empty
> > > > > implementation), but if we wanted a single API to handle both
> > > > > cases,
> > > > > how about something like the following?
> > > > >
> > > > > enum Time {
> > > > >    STREAM,
> > > > >    CLOCK
> > > > > }
> > > > >
> > > > > Then on ProcessorContext:
> > > > > context.schedule(Time time, long interval)  // We could allow this
> > > > > to
> > > > > be called once for each value of time to mix approaches.
> > > > >
> > > > > Then the Processor API becomes:
> > > > > punctuate(Time time) // time here denotes which schedule resulted
> > > > > in
> > > > > this call.
> > > > >
> > > > > Thoughts?
> > > > >
> > > > >
> > > > > On Mon, 2017-04-03 at 22:44 -0700, Matthias J. Sax wrote:
> > > > > >
> > > > > > Thanks a lot for the KIP Michal,
> > > > > >
> > > > > > I was thinking about the four options you proposed in more
> > > > > > details
> > > > > > and
> > > > > > this are my thoughts:
> > > > > >
> > > > > > (A) You argue, that users can still "punctuate" on event-time via
> > > > > > process(), but I am not sure if this is possible. Note, that
> > > > > > users
> > > > > > only
> > > > > > get record timestamps via context.timestamp(). Thus, users would
> > > > > > need
> > > > > > to
> > > > > > track the time progress per partition (based on the partitions
> > > > > > they
> > > > > > obverse via context.partition(). (This alone puts a huge burden
> > > > > > on
> > > > > > the
> > > > > > user by itself.) However, users are not notified at startup what
> > > > > > partitions are assigned, and user are not notified when
> > > > > > partitions
> > > > > > get
> > > > > > revoked. Because this information is not available, it's not
> > > > > > possible
> > > > > > to
> > > > > > "manually advance" stream-time, and thus event-time punctuation
> > > > > > within
> > > > > > process() seems not to be possible -- or do you see a way to get
> > > > > > it
> > > > > > done? And even if, it might still be too clumsy to use.
> > > > > >
> > > > > > (B) This does not allow to mix both approaches, thus limiting
> > > > > > what
> > > > > > users
> > > > > > can do.
> > > > > >
> > > > > > (C) This should give all flexibility we need. However, just
> > > > > > adding
> > > > > > one
> > > > > > more method seems to be a solution that is too simple (cf my
> > > > > > comments
> > > > > > below).
> > > > > >
> > > > > > (D) This might be hard to use. Also, I am not sure how a user
> > > > > > could
> > > > > > enable system-time and event-time punctuation in parallel.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Overall options (C) seems to be the most promising approach to
> > > > > > me.
> > > > > > Because I also favor a clean API, we might keep current
> > > > > > punctuate()
> > > > > > as-is, but deprecate it -- so we can remove it at some later
> > > > > > point
> > > > > > when
> > > > > > people use the "new punctuate API".
> > > > > >
> > > > > >
> > > > > > Couple of follow up questions:
> > > > > >
> > > > > > - I am wondering, if we should have two callback methods or just
> > > > > > one
> > > > > > (ie, a unified for system and event time punctuation or one for
> > > > > > each?).
> > > > > >
> > > > > > - If we have one, how can the user figure out, which condition
> > > > > > did
> > > > > > trigger?
> > > > > >
> > > > > > - How would the API look like, for registering different
> > > > > > punctuate
> > > > > > schedules? The "type" must be somehow defined?
> > > > > >
> > > > > > - We might want to add "complex" schedules later on (like,
> > > > > > punctuate
> > > > > > on
> > > > > > every 10 seconds event-time or 60 seconds system-time whatever
> > > > > > comes
> > > > > > first). I don't say we should add this right away, but we might
> > > > > > want
> > > > > > to
> > > > > > define the API in a way, that it allows extensions like this
> > > > > > later
> > > > > > on,
> > > > > > without redesigning the API (ie, the API should be designed
> > > > > > extensible)
> > > > > >
> > > > > > - Did you ever consider count-based punctuation?
> > > > > >
> > > > > >
> > > > > > I understand, that you would like to solve a simple problem, but
> > > > > > we
> > > > > > learned from the past, that just "adding some API" quickly leads
> > > > > > to a
> > > > > > not very well defined API that needs time consuming clean up
> > > > > > later on
> > > > > > via other KIPs. Thus, I would prefer to get a holistic
> > > > > > punctuation
> > > > > > KIP
> > > > > > with this from the beginning on to avoid later painful redesign.
> > > > > >
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 4/3/17 11:58 AM, Michal Borowiecki wrote:
> > > > > > >
> > > > > > >
> > > > > > > Thanks Thomas,
> > > > > > >
> > > > > > > I'm also wary of changing the existing semantics of punctuate,
> > > > > > > for
> > > > > > > backward compatibility reasons, although I like the conceptual
> > > > > > > simplicity of that option.
> > > > > > >
> > > > > > > Adding a new method to me feels safer but, in a way, uglier. I
> > > > > > > added
> > > > > > > this to the KIP now as option (C).
> > > > > > >
> > > > > > > The TimestampExtractor mechanism is actually more flexible, as
> > > > > > > it
> > > > > > > allows
> > > > > > > you to return any value, you're not limited to event time or
> > > > > > > system
> > > > > > > time
> > > > > > > (although I don't see an actual use case where you might need
> > > > > > > anything
> > > > > > > else then those two). Hence I also proposed the option to allow
> > > > > > > users
> > > > > > > to, effectively, decide what "stream time" is for them given
> > > > > > > the
> > > > > > > presence or absence of messages, much like they can decide what
> > > > > > > msg
> > > > > > > time
> > > > > > > means for them using the TimestampExtractor. What do you think
> > > > > > > about
> > > > > > > that? This is probably most flexible but also most complicated.
> > > > > > >
> > > > > > > All comments appreciated.
> > > > > > >
> > > > > > > Cheers,
> > > > > > >
> > > > > > > Michal
> > > > > > >
> > > > > > >
> > > > > > > On 03/04/17 19:23, Thomas Becker wrote:
> > > > > > > >
> > > > > > > >
> > > > > > > > Although I fully agree we need a way to trigger periodic
> > > > > > > > processing
> > > > > > > > that is independent from whether and when messages arrive,
> > > > > > > > I'm
> > > > > > > > not sure
> > > > > > > > I like the idea of changing the existing semantics across the
> > > > > > > > board.
> > > > > > > > What if we added an additional callback to Processor that can
> > > > > > > > be
> > > > > > > > scheduled similarly to punctuate() but was always called at
> > > > > > > > fixed, wall
> > > > > > > > clock based intervals? This way you wouldn't have to give up
> > > > > > > > the
> > > > > > > > notion
> > > > > > > > of stream time to be able to do periodic processing.
> > > > > > > >
> > > > > > > > On Mon, 2017-04-03 at 10:34 +0100, Michal Borowiecki wrote:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I have created a draft for KIP-138: Change punctuate
> > > > > > > > > semantics
> > > > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 138%
> > > > > > > > > 3A+C
> > > > > > > > > hange+
> > > > > > > > > punctuate+semantics>
> > > > > > > > > .
> > > > > > > > >
> > > > > > > > > Appreciating there can be different views on system-time vs
> > > > > > > > > event-
> > > > > > > > > time
> > > > > > > > > semantics for punctuation depending on use-case and the
> > > > > > > > > importance of
> > > > > > > > > backwards compatibility of any such change, I've left it
> > > > > > > > > quite
> > > > > > > > > open
> > > > > > > > > and
> > > > > > > > > hope to fill in more info as the discussion progresses.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Michal
> > > > > --
> > > > >
> > > > >
> > > > >     Tommy Becker
> > > > >
> > > > >     Senior Software Engineer
> > > > >
> > > > >     O +1 919.460.4747
> > > > >
> > > > >     tivo.com
> > > > >
> > > > >
> > > > > ________________________________
> > > > >
> > > > > This email and any attachments may contain confidential and
> > > > > privileged material for the sole use of the intended recipient. Any
> > > > > review, copying, or distribution of this email (or any attachments)
> > > > > by others is prohibited. If you are not the intended recipient,
> > > > > please contact the sender immediately and permanently delete this
> > > > > email and any attachments. No employee or agent of TiVo Inc. is
> > > > > authorized to conclude any binding agreement on behalf of TiVo Inc.
> > > > > by email. Binding agreements with TiVo Inc. may only be made by a
> > > > > signed written agreement.
> > > > >
> > > --
> > >
> > >
> > >     Tommy Becker
> > >
> > >     Senior Software Engineer
> > >
> > >     O +1 919.460.4747
> > >
> > >     tivo.com
> > >
> > >
> > > ________________________________
> > >
> > > This email and any attachments may contain confidential and privileged
> > > material for the sole use of the intended recipient. Any review,
> copying,
> > > or distribution of this email (or any attachments) by others is
> > prohibited.
> > > If you are not the intended recipient, please contact the sender
> > > immediately and permanently delete this email and any attachments. No
> > > employee or agent of TiVo Inc. is authorized to conclude any binding
> > > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> > > Inc. may only be made by a signed written agreement.
> > >
> >
>

Reply via email to