Hi Jay, The hybrid solution is exactly what I expect and need for our use cases when dealing with telecom data.
Thanks Tianji On Wed, Apr 5, 2017 at 12:01 AM, Jay Kreps <j...@confluent.io> wrote: > Hey guys, > > One thing I've always found super important for this kind of design work is > to do a really good job of cataloging the landscape of use cases and how > prevalent each one is. By that I mean not just listing lots of uses, but > also grouping them into categories that functionally need the same thing. > In the absence of this it is very hard to reason about design proposals. > From the proposals so far I think we have a lot of discussion around > possible apis, but less around what the user needs for different use cases > and how they would implement that using the api. > > Here is an example: > You aggregate click and impression data for a reddit like site. Every ten > minutes you want to output a ranked list of the top 10 articles ranked by > clicks/impressions for each geographical area. I want to be able run this > in steady state as well as rerun to regenerate results (or catch up if it > crashes). > > There are a couple of tricky things that seem to make this hard with either > of the options proposed: > 1. If I emit this data using event time I have the problem described where > a geographical region with no new clicks or impressions will fail to output > results. > 2. If I emit this data using system time I have the problem that when > reprocessing data my window may not be ten minutes but 10 hours if my > processing is very fast so it dramatically changes the output. > > Maybe a hybrid solution works: I window by event time but trigger results > by system time for windows that have updated? Not really sure the details > of making that work. Does that work? Are there concrete examples where you > actually want the current behavior? > > -Jay > > > On Tue, Apr 4, 2017 at 8:32 PM, Arun Mathew <arunmathe...@gmail.com> > wrote: > > > Hi All, > > > > Thanks for the KIP. We were also in need of a mechanism to trigger > > punctuate in the absence of events. > > > > As I described in [ > > https://issues.apache.org/jira/browse/KAFKA-3514? > > focusedCommentId=15926036&page=com.atlassian.jira. > > plugin.system.issuetabpanels:comment-tabpanel#comment-15926036 > > ], > > > > - Our approached involved using the event time by default. > > - The method to check if there is any punctuate ready in the > > PunctuationQueue is triggered via the any event received by the stream > > tread, or at the polling intervals in the absence of any events. > > - When we create Punctuate objects (which contains the next event time > > for punctuation and interval), we also record the creation time > (system > > time). > > - While checking for maturity of Punctuate Schedule by mayBePunctuate > > method, we also check if the system clock has elapsed the punctuate > > interval since the schedule creation time. > > - In the absence of any event, or in the absence of any event for one > > topic in the partition group assigned to the stream task, the system > > time > > will elapse the interval and we trigger a punctuate using the expected > > punctuation event time. > > - we then create the next punctuation schedule as punctuation event > time > > + punctuation interval, [again recording the system time of creation > of > > the > > schedule]. > > > > We call this a Hybrid Punctuate. Of course, this approach has pros and > > cons. > > Pros > > > > - Punctuates will happen in <punctuate interval> time duration at max > in > > terms of system time. > > - The semantics as a whole continues to revolve around event time. > > - We can use the old data [old timestamps] to rerun any experiments or > > tests. > > > > Cons > > > > - In case the <punctuate interval> is not a time duration [say > logical > > time/event count], then the approach might not be meaningful. > > - In case there is a case where we have to wait for an actual event > from > > a low event rate partition in the partition group, this approach will > > jump > > the gun. > > - in case the event processing cannot catch up with the event rate and > > the expected timestamp events gets queued for long time, this approach > > might jump the gun. > > > > I believe the above approach and discussion goes close to the approach A. > > > > ----------- > > > > I like the idea of having an even count based punctuate. > > > > ----------- > > > > I agree with the discussion around approach C, that we should provide the > > user with the option to choose system time or event time based > punctuates. > > But I believe that the user predominantly wants to use event time while > not > > missing out on regular punctuates due to event delays or event absences. > > Hence a complex punctuate option as Matthias mentioned (quoted below) > would > > be most apt. > > > > "- We might want to add "complex" schedules later on (like, punctuate on > > every 10 seconds event-time or 60 seconds system-time whatever comes > > first)." > > > > ----------- > > > > I think I read somewhere that Kafka Streams started with System Time as > the > > punctuation standard, but was later changed to Event Time. I guess there > > would be some good reason behind it. As Kafka Streams want to evolve more > > on the Stream Processing front, I believe the emphasis on event time > would > > remain quite strong. > > > > > > With Regards, > > > > Arun Mathew > > Yahoo! JAPAN Corporation, Tokyo > > > > > > On Wed, Apr 5, 2017 at 3:53 AM, Thomas Becker <tobec...@tivo.com> wrote: > > > > > Yeah I like PuncutationType much better; I just threw Time out there > > > more as a strawman than an actual suggestion ;) I still think it's > > > worth considering what this buys us over an additional callback. I > > > foresee a number of punctuate implementations following this pattern: > > > > > > public void punctuate(PunctuationType type) { > > > switch (type) { > > > case EVENT_TIME: > > > methodA(); > > > break; > > > case SYSTEM_TIME: > > > methodB(); > > > break; > > > } > > > } > > > > > > I guess one advantage of this approach is we could add additional > > > punctuation types later in a backwards compatible way (like event count > > > as you mentioned). > > > > > > -Tommy > > > > > > > > > On Tue, 2017-04-04 at 11:10 -0700, Matthias J. Sax wrote: > > > > That sounds promising. > > > > > > > > I am just wondering if `Time` is the best name. Maybe we want to add > > > > other non-time based punctuations at some point later. I would > > > > suggest > > > > > > > > enum PunctuationType { > > > > EVENT_TIME, > > > > SYSTEM_TIME, > > > > } > > > > > > > > or similar. Just to keep the door open -- it's easier to add new > > > > stuff > > > > if the name is more generic. > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 4/4/17 5:30 AM, Thomas Becker wrote: > > > > > > > > > > I agree that the framework providing and managing the notion of > > > > > stream > > > > > time is valuable and not something we would want to delegate to the > > > > > tasks. I'm not entirely convinced that a separate callback (option > > > > > C) > > > > > is that messy (it could just be a default method with an empty > > > > > implementation), but if we wanted a single API to handle both > > > > > cases, > > > > > how about something like the following? > > > > > > > > > > enum Time { > > > > > STREAM, > > > > > CLOCK > > > > > } > > > > > > > > > > Then on ProcessorContext: > > > > > context.schedule(Time time, long interval) // We could allow this > > > > > to > > > > > be called once for each value of time to mix approaches. > > > > > > > > > > Then the Processor API becomes: > > > > > punctuate(Time time) // time here denotes which schedule resulted > > > > > in > > > > > this call. > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > > On Mon, 2017-04-03 at 22:44 -0700, Matthias J. Sax wrote: > > > > > > > > > > > > Thanks a lot for the KIP Michal, > > > > > > > > > > > > I was thinking about the four options you proposed in more > > > > > > details > > > > > > and > > > > > > this are my thoughts: > > > > > > > > > > > > (A) You argue, that users can still "punctuate" on event-time via > > > > > > process(), but I am not sure if this is possible. Note, that > > > > > > users > > > > > > only > > > > > > get record timestamps via context.timestamp(). Thus, users would > > > > > > need > > > > > > to > > > > > > track the time progress per partition (based on the partitions > > > > > > they > > > > > > obverse via context.partition(). (This alone puts a huge burden > > > > > > on > > > > > > the > > > > > > user by itself.) However, users are not notified at startup what > > > > > > partitions are assigned, and user are not notified when > > > > > > partitions > > > > > > get > > > > > > revoked. Because this information is not available, it's not > > > > > > possible > > > > > > to > > > > > > "manually advance" stream-time, and thus event-time punctuation > > > > > > within > > > > > > process() seems not to be possible -- or do you see a way to get > > > > > > it > > > > > > done? And even if, it might still be too clumsy to use. > > > > > > > > > > > > (B) This does not allow to mix both approaches, thus limiting > > > > > > what > > > > > > users > > > > > > can do. > > > > > > > > > > > > (C) This should give all flexibility we need. However, just > > > > > > adding > > > > > > one > > > > > > more method seems to be a solution that is too simple (cf my > > > > > > comments > > > > > > below). > > > > > > > > > > > > (D) This might be hard to use. Also, I am not sure how a user > > > > > > could > > > > > > enable system-time and event-time punctuation in parallel. > > > > > > > > > > > > > > > > > > > > > > > > Overall options (C) seems to be the most promising approach to > > > > > > me. > > > > > > Because I also favor a clean API, we might keep current > > > > > > punctuate() > > > > > > as-is, but deprecate it -- so we can remove it at some later > > > > > > point > > > > > > when > > > > > > people use the "new punctuate API". > > > > > > > > > > > > > > > > > > Couple of follow up questions: > > > > > > > > > > > > - I am wondering, if we should have two callback methods or just > > > > > > one > > > > > > (ie, a unified for system and event time punctuation or one for > > > > > > each?). > > > > > > > > > > > > - If we have one, how can the user figure out, which condition > > > > > > did > > > > > > trigger? > > > > > > > > > > > > - How would the API look like, for registering different > > > > > > punctuate > > > > > > schedules? The "type" must be somehow defined? > > > > > > > > > > > > - We might want to add "complex" schedules later on (like, > > > > > > punctuate > > > > > > on > > > > > > every 10 seconds event-time or 60 seconds system-time whatever > > > > > > comes > > > > > > first). I don't say we should add this right away, but we might > > > > > > want > > > > > > to > > > > > > define the API in a way, that it allows extensions like this > > > > > > later > > > > > > on, > > > > > > without redesigning the API (ie, the API should be designed > > > > > > extensible) > > > > > > > > > > > > - Did you ever consider count-based punctuation? > > > > > > > > > > > > > > > > > > I understand, that you would like to solve a simple problem, but > > > > > > we > > > > > > learned from the past, that just "adding some API" quickly leads > > > > > > to a > > > > > > not very well defined API that needs time consuming clean up > > > > > > later on > > > > > > via other KIPs. Thus, I would prefer to get a holistic > > > > > > punctuation > > > > > > KIP > > > > > > with this from the beginning on to avoid later painful redesign. > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > > > > > > > On 4/3/17 11:58 AM, Michal Borowiecki wrote: > > > > > > > > > > > > > > > > > > > > > Thanks Thomas, > > > > > > > > > > > > > > I'm also wary of changing the existing semantics of punctuate, > > > > > > > for > > > > > > > backward compatibility reasons, although I like the conceptual > > > > > > > simplicity of that option. > > > > > > > > > > > > > > Adding a new method to me feels safer but, in a way, uglier. I > > > > > > > added > > > > > > > this to the KIP now as option (C). > > > > > > > > > > > > > > The TimestampExtractor mechanism is actually more flexible, as > > > > > > > it > > > > > > > allows > > > > > > > you to return any value, you're not limited to event time or > > > > > > > system > > > > > > > time > > > > > > > (although I don't see an actual use case where you might need > > > > > > > anything > > > > > > > else then those two). Hence I also proposed the option to allow > > > > > > > users > > > > > > > to, effectively, decide what "stream time" is for them given > > > > > > > the > > > > > > > presence or absence of messages, much like they can decide what > > > > > > > msg > > > > > > > time > > > > > > > means for them using the TimestampExtractor. What do you think > > > > > > > about > > > > > > > that? This is probably most flexible but also most complicated. > > > > > > > > > > > > > > All comments appreciated. > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > Michal > > > > > > > > > > > > > > > > > > > > > On 03/04/17 19:23, Thomas Becker wrote: > > > > > > > > > > > > > > > > > > > > > > > > Although I fully agree we need a way to trigger periodic > > > > > > > > processing > > > > > > > > that is independent from whether and when messages arrive, > > > > > > > > I'm > > > > > > > > not sure > > > > > > > > I like the idea of changing the existing semantics across the > > > > > > > > board. > > > > > > > > What if we added an additional callback to Processor that can > > > > > > > > be > > > > > > > > scheduled similarly to punctuate() but was always called at > > > > > > > > fixed, wall > > > > > > > > clock based intervals? This way you wouldn't have to give up > > > > > > > > the > > > > > > > > notion > > > > > > > > of stream time to be able to do periodic processing. > > > > > > > > > > > > > > > > On Mon, 2017-04-03 at 10:34 +0100, Michal Borowiecki wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > I have created a draft for KIP-138: Change punctuate > > > > > > > > > semantics > > > > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 138% > > > > > > > > > 3A+C > > > > > > > > > hange+ > > > > > > > > > punctuate+semantics> > > > > > > > > > . > > > > > > > > > > > > > > > > > > Appreciating there can be different views on system-time vs > > > > > > > > > event- > > > > > > > > > time > > > > > > > > > semantics for punctuation depending on use-case and the > > > > > > > > > importance of > > > > > > > > > backwards compatibility of any such change, I've left it > > > > > > > > > quite > > > > > > > > > open > > > > > > > > > and > > > > > > > > > hope to fill in more info as the discussion progresses. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Michal > > > > > -- > > > > > > > > > > > > > > > Tommy Becker > > > > > > > > > > Senior Software Engineer > > > > > > > > > > O +1 919.460.4747 > > > > > > > > > > tivo.com > > > > > > > > > > > > > > > ________________________________ > > > > > > > > > > This email and any attachments may contain confidential and > > > > > privileged material for the sole use of the intended recipient. Any > > > > > review, copying, or distribution of this email (or any attachments) > > > > > by others is prohibited. If you are not the intended recipient, > > > > > please contact the sender immediately and permanently delete this > > > > > email and any attachments. No employee or agent of TiVo Inc. is > > > > > authorized to conclude any binding agreement on behalf of TiVo Inc. > > > > > by email. Binding agreements with TiVo Inc. may only be made by a > > > > > signed written agreement. > > > > > > > > -- > > > > > > > > > Tommy Becker > > > > > > Senior Software Engineer > > > > > > O +1 919.460.4747 > > > > > > tivo.com > > > > > > > > > ________________________________ > > > > > > This email and any attachments may contain confidential and privileged > > > material for the sole use of the intended recipient. Any review, > copying, > > > or distribution of this email (or any attachments) by others is > > prohibited. > > > If you are not the intended recipient, please contact the sender > > > immediately and permanently delete this email and any attachments. No > > > employee or agent of TiVo Inc. is authorized to conclude any binding > > > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > > > Inc. may only be made by a signed written agreement. > > > > > >