Please share your Wiki-ID and a committer can give you write access. Btw: as you did not initiate the KIP, you should not change the KIP without the permission of the original author -- in this case Michael.
So you might also just share your thought over the mailing list and Michael can update the KIP page. Or, as an alternative, just create a subpage for the KIP page. @Michael: WDYT? -Matthias On 4/6/17 8:05 PM, Arun Mathew wrote: > Hi Jay, > Thanks for the advise, I would like to list down the use cases as > per your suggestion. But it seems I don't have write permission to the > Apache Kafka Confluent Space. Whom shall I request for it? > > Regarding your last question. We are using a patch in our production system > which does exactly this. > We window by the event time, but trigger punctuate in <punctuate interval> > duration of system time, in the absence of an event crossing the punctuate > event time. > > We are using Kafka Stream for our Audit Trail, where we need to output the > event counts on each topic on each cluster aggregated over a 1 minute > window. We have to use event time to be able to cross check the counts. But > we need to trigger punctuate [aggregate event pushes] by system time in the > absence of events. Otherwise the event counts for unexpired windows would > be 0 which is bad. > > "Maybe a hybrid solution works: I window by event time but trigger results > by system time for windows that have updated? Not really sure the details > of making that work. Does that work? Are there concrete examples where you > actually want the current behavior?" > > -- > With Regards, > > Arun Mathew > Yahoo! JAPAN Corporation > > On Wed, Apr 5, 2017 at 8:48 PM, Tianji Li <skyah...@gmail.com> wrote: > >> Hi Jay, >> >> The hybrid solution is exactly what I expect and need for our use cases >> when dealing with telecom data. >> >> Thanks >> Tianji >> >> On Wed, Apr 5, 2017 at 12:01 AM, Jay Kreps <j...@confluent.io> wrote: >> >>> Hey guys, >>> >>> One thing I've always found super important for this kind of design work >> is >>> to do a really good job of cataloging the landscape of use cases and how >>> prevalent each one is. By that I mean not just listing lots of uses, but >>> also grouping them into categories that functionally need the same thing. >>> In the absence of this it is very hard to reason about design proposals. >>> From the proposals so far I think we have a lot of discussion around >>> possible apis, but less around what the user needs for different use >> cases >>> and how they would implement that using the api. >>> >>> Here is an example: >>> You aggregate click and impression data for a reddit like site. Every ten >>> minutes you want to output a ranked list of the top 10 articles ranked by >>> clicks/impressions for each geographical area. I want to be able run this >>> in steady state as well as rerun to regenerate results (or catch up if it >>> crashes). >>> >>> There are a couple of tricky things that seem to make this hard with >> either >>> of the options proposed: >>> 1. If I emit this data using event time I have the problem described >> where >>> a geographical region with no new clicks or impressions will fail to >> output >>> results. >>> 2. If I emit this data using system time I have the problem that when >>> reprocessing data my window may not be ten minutes but 10 hours if my >>> processing is very fast so it dramatically changes the output. >>> >>> Maybe a hybrid solution works: I window by event time but trigger results >>> by system time for windows that have updated? Not really sure the details >>> of making that work. Does that work? Are there concrete examples where >> you >>> actually want the current behavior? >>> >>> -Jay >>> >>> >>> On Tue, Apr 4, 2017 at 8:32 PM, Arun Mathew <arunmathe...@gmail.com> >>> wrote: >>> >>>> Hi All, >>>> >>>> Thanks for the KIP. We were also in need of a mechanism to trigger >>>> punctuate in the absence of events. >>>> >>>> As I described in [ >>>> https://issues.apache.org/jira/browse/KAFKA-3514? >>>> focusedCommentId=15926036&page=com.atlassian.jira. >>>> plugin.system.issuetabpanels:comment-tabpanel#comment-15926036 >>>> ], >>>> >>>> - Our approached involved using the event time by default. >>>> - The method to check if there is any punctuate ready in the >>>> PunctuationQueue is triggered via the any event received by the >> stream >>>> tread, or at the polling intervals in the absence of any events. >>>> - When we create Punctuate objects (which contains the next event >> time >>>> for punctuation and interval), we also record the creation time >>> (system >>>> time). >>>> - While checking for maturity of Punctuate Schedule by >> mayBePunctuate >>>> method, we also check if the system clock has elapsed the punctuate >>>> interval since the schedule creation time. >>>> - In the absence of any event, or in the absence of any event for >> one >>>> topic in the partition group assigned to the stream task, the system >>>> time >>>> will elapse the interval and we trigger a punctuate using the >> expected >>>> punctuation event time. >>>> - we then create the next punctuation schedule as punctuation event >>> time >>>> + punctuation interval, [again recording the system time of creation >>> of >>>> the >>>> schedule]. >>>> >>>> We call this a Hybrid Punctuate. Of course, this approach has pros and >>>> cons. >>>> Pros >>>> >>>> - Punctuates will happen in <punctuate interval> time duration at >> max >>> in >>>> terms of system time. >>>> - The semantics as a whole continues to revolve around event time. >>>> - We can use the old data [old timestamps] to rerun any experiments >> or >>>> tests. >>>> >>>> Cons >>>> >>>> - In case the <punctuate interval> is not a time duration [say >>> logical >>>> time/event count], then the approach might not be meaningful. >>>> - In case there is a case where we have to wait for an actual event >>> from >>>> a low event rate partition in the partition group, this approach >> will >>>> jump >>>> the gun. >>>> - in case the event processing cannot catch up with the event rate >> and >>>> the expected timestamp events gets queued for long time, this >> approach >>>> might jump the gun. >>>> >>>> I believe the above approach and discussion goes close to the approach >> A. >>>> >>>> ----------- >>>> >>>> I like the idea of having an even count based punctuate. >>>> >>>> ----------- >>>> >>>> I agree with the discussion around approach C, that we should provide >> the >>>> user with the option to choose system time or event time based >>> punctuates. >>>> But I believe that the user predominantly wants to use event time while >>> not >>>> missing out on regular punctuates due to event delays or event >> absences. >>>> Hence a complex punctuate option as Matthias mentioned (quoted below) >>> would >>>> be most apt. >>>> >>>> "- We might want to add "complex" schedules later on (like, punctuate >> on >>>> every 10 seconds event-time or 60 seconds system-time whatever comes >>>> first)." >>>> >>>> ----------- >>>> >>>> I think I read somewhere that Kafka Streams started with System Time as >>> the >>>> punctuation standard, but was later changed to Event Time. I guess >> there >>>> would be some good reason behind it. As Kafka Streams want to evolve >> more >>>> on the Stream Processing front, I believe the emphasis on event time >>> would >>>> remain quite strong. >>>> >>>> >>>> With Regards, >>>> >>>> Arun Mathew >>>> Yahoo! JAPAN Corporation, Tokyo >>>> >>>> >>>> On Wed, Apr 5, 2017 at 3:53 AM, Thomas Becker <tobec...@tivo.com> >> wrote: >>>> >>>>> Yeah I like PuncutationType much better; I just threw Time out there >>>>> more as a strawman than an actual suggestion ;) I still think it's >>>>> worth considering what this buys us over an additional callback. I >>>>> foresee a number of punctuate implementations following this pattern: >>>>> >>>>> public void punctuate(PunctuationType type) { >>>>> switch (type) { >>>>> case EVENT_TIME: >>>>> methodA(); >>>>> break; >>>>> case SYSTEM_TIME: >>>>> methodB(); >>>>> break; >>>>> } >>>>> } >>>>> >>>>> I guess one advantage of this approach is we could add additional >>>>> punctuation types later in a backwards compatible way (like event >> count >>>>> as you mentioned). >>>>> >>>>> -Tommy >>>>> >>>>> >>>>> On Tue, 2017-04-04 at 11:10 -0700, Matthias J. Sax wrote: >>>>>> That sounds promising. >>>>>> >>>>>> I am just wondering if `Time` is the best name. Maybe we want to >> add >>>>>> other non-time based punctuations at some point later. I would >>>>>> suggest >>>>>> >>>>>> enum PunctuationType { >>>>>> EVENT_TIME, >>>>>> SYSTEM_TIME, >>>>>> } >>>>>> >>>>>> or similar. Just to keep the door open -- it's easier to add new >>>>>> stuff >>>>>> if the name is more generic. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 4/4/17 5:30 AM, Thomas Becker wrote: >>>>>>> >>>>>>> I agree that the framework providing and managing the notion of >>>>>>> stream >>>>>>> time is valuable and not something we would want to delegate to >> the >>>>>>> tasks. I'm not entirely convinced that a separate callback >> (option >>>>>>> C) >>>>>>> is that messy (it could just be a default method with an empty >>>>>>> implementation), but if we wanted a single API to handle both >>>>>>> cases, >>>>>>> how about something like the following? >>>>>>> >>>>>>> enum Time { >>>>>>> STREAM, >>>>>>> CLOCK >>>>>>> } >>>>>>> >>>>>>> Then on ProcessorContext: >>>>>>> context.schedule(Time time, long interval) // We could allow >> this >>>>>>> to >>>>>>> be called once for each value of time to mix approaches. >>>>>>> >>>>>>> Then the Processor API becomes: >>>>>>> punctuate(Time time) // time here denotes which schedule resulted >>>>>>> in >>>>>>> this call. >>>>>>> >>>>>>> Thoughts? >>>>>>> >>>>>>> >>>>>>> On Mon, 2017-04-03 at 22:44 -0700, Matthias J. Sax wrote: >>>>>>>> >>>>>>>> Thanks a lot for the KIP Michal, >>>>>>>> >>>>>>>> I was thinking about the four options you proposed in more >>>>>>>> details >>>>>>>> and >>>>>>>> this are my thoughts: >>>>>>>> >>>>>>>> (A) You argue, that users can still "punctuate" on event-time >> via >>>>>>>> process(), but I am not sure if this is possible. Note, that >>>>>>>> users >>>>>>>> only >>>>>>>> get record timestamps via context.timestamp(). Thus, users >> would >>>>>>>> need >>>>>>>> to >>>>>>>> track the time progress per partition (based on the partitions >>>>>>>> they >>>>>>>> obverse via context.partition(). (This alone puts a huge burden >>>>>>>> on >>>>>>>> the >>>>>>>> user by itself.) However, users are not notified at startup >> what >>>>>>>> partitions are assigned, and user are not notified when >>>>>>>> partitions >>>>>>>> get >>>>>>>> revoked. Because this information is not available, it's not >>>>>>>> possible >>>>>>>> to >>>>>>>> "manually advance" stream-time, and thus event-time punctuation >>>>>>>> within >>>>>>>> process() seems not to be possible -- or do you see a way to >> get >>>>>>>> it >>>>>>>> done? And even if, it might still be too clumsy to use. >>>>>>>> >>>>>>>> (B) This does not allow to mix both approaches, thus limiting >>>>>>>> what >>>>>>>> users >>>>>>>> can do. >>>>>>>> >>>>>>>> (C) This should give all flexibility we need. However, just >>>>>>>> adding >>>>>>>> one >>>>>>>> more method seems to be a solution that is too simple (cf my >>>>>>>> comments >>>>>>>> below). >>>>>>>> >>>>>>>> (D) This might be hard to use. Also, I am not sure how a user >>>>>>>> could >>>>>>>> enable system-time and event-time punctuation in parallel. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Overall options (C) seems to be the most promising approach to >>>>>>>> me. >>>>>>>> Because I also favor a clean API, we might keep current >>>>>>>> punctuate() >>>>>>>> as-is, but deprecate it -- so we can remove it at some later >>>>>>>> point >>>>>>>> when >>>>>>>> people use the "new punctuate API". >>>>>>>> >>>>>>>> >>>>>>>> Couple of follow up questions: >>>>>>>> >>>>>>>> - I am wondering, if we should have two callback methods or >> just >>>>>>>> one >>>>>>>> (ie, a unified for system and event time punctuation or one for >>>>>>>> each?). >>>>>>>> >>>>>>>> - If we have one, how can the user figure out, which condition >>>>>>>> did >>>>>>>> trigger? >>>>>>>> >>>>>>>> - How would the API look like, for registering different >>>>>>>> punctuate >>>>>>>> schedules? The "type" must be somehow defined? >>>>>>>> >>>>>>>> - We might want to add "complex" schedules later on (like, >>>>>>>> punctuate >>>>>>>> on >>>>>>>> every 10 seconds event-time or 60 seconds system-time whatever >>>>>>>> comes >>>>>>>> first). I don't say we should add this right away, but we might >>>>>>>> want >>>>>>>> to >>>>>>>> define the API in a way, that it allows extensions like this >>>>>>>> later >>>>>>>> on, >>>>>>>> without redesigning the API (ie, the API should be designed >>>>>>>> extensible) >>>>>>>> >>>>>>>> - Did you ever consider count-based punctuation? >>>>>>>> >>>>>>>> >>>>>>>> I understand, that you would like to solve a simple problem, >> but >>>>>>>> we >>>>>>>> learned from the past, that just "adding some API" quickly >> leads >>>>>>>> to a >>>>>>>> not very well defined API that needs time consuming clean up >>>>>>>> later on >>>>>>>> via other KIPs. Thus, I would prefer to get a holistic >>>>>>>> punctuation >>>>>>>> KIP >>>>>>>> with this from the beginning on to avoid later painful >> redesign. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 4/3/17 11:58 AM, Michal Borowiecki wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks Thomas, >>>>>>>>> >>>>>>>>> I'm also wary of changing the existing semantics of >> punctuate, >>>>>>>>> for >>>>>>>>> backward compatibility reasons, although I like the >> conceptual >>>>>>>>> simplicity of that option. >>>>>>>>> >>>>>>>>> Adding a new method to me feels safer but, in a way, uglier. >> I >>>>>>>>> added >>>>>>>>> this to the KIP now as option (C). >>>>>>>>> >>>>>>>>> The TimestampExtractor mechanism is actually more flexible, >> as >>>>>>>>> it >>>>>>>>> allows >>>>>>>>> you to return any value, you're not limited to event time or >>>>>>>>> system >>>>>>>>> time >>>>>>>>> (although I don't see an actual use case where you might need >>>>>>>>> anything >>>>>>>>> else then those two). Hence I also proposed the option to >> allow >>>>>>>>> users >>>>>>>>> to, effectively, decide what "stream time" is for them given >>>>>>>>> the >>>>>>>>> presence or absence of messages, much like they can decide >> what >>>>>>>>> msg >>>>>>>>> time >>>>>>>>> means for them using the TimestampExtractor. What do you >> think >>>>>>>>> about >>>>>>>>> that? This is probably most flexible but also most >> complicated. >>>>>>>>> >>>>>>>>> All comments appreciated. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> >>>>>>>>> Michal >>>>>>>>> >>>>>>>>> >>>>>>>>> On 03/04/17 19:23, Thomas Becker wrote: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Although I fully agree we need a way to trigger periodic >>>>>>>>>> processing >>>>>>>>>> that is independent from whether and when messages arrive, >>>>>>>>>> I'm >>>>>>>>>> not sure >>>>>>>>>> I like the idea of changing the existing semantics across >> the >>>>>>>>>> board. >>>>>>>>>> What if we added an additional callback to Processor that >> can >>>>>>>>>> be >>>>>>>>>> scheduled similarly to punctuate() but was always called at >>>>>>>>>> fixed, wall >>>>>>>>>> clock based intervals? This way you wouldn't have to give >> up >>>>>>>>>> the >>>>>>>>>> notion >>>>>>>>>> of stream time to be able to do periodic processing. >>>>>>>>>> >>>>>>>>>> On Mon, 2017-04-03 at 10:34 +0100, Michal Borowiecki wrote: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>> I have created a draft for KIP-138: Change punctuate >>>>>>>>>>> semantics >>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>> 138% >>>>>>>>>>> 3A+C >>>>>>>>>>> hange+ >>>>>>>>>>> punctuate+semantics> >>>>>>>>>>> . >>>>>>>>>>> >>>>>>>>>>> Appreciating there can be different views on system-time >> vs >>>>>>>>>>> event- >>>>>>>>>>> time >>>>>>>>>>> semantics for punctuation depending on use-case and the >>>>>>>>>>> importance of >>>>>>>>>>> backwards compatibility of any such change, I've left it >>>>>>>>>>> quite >>>>>>>>>>> open >>>>>>>>>>> and >>>>>>>>>>> hope to fill in more info as the discussion progresses. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Michal >>>>>>> -- >>>>>>> >>>>>>> >>>>>>> Tommy Becker >>>>>>> >>>>>>> Senior Software Engineer >>>>>>> >>>>>>> O +1 919.460.4747 >>>>>>> >>>>>>> tivo.com >>>>>>> >>>>>>> >>>>>>> ________________________________ >>>>>>> >>>>>>> This email and any attachments may contain confidential and >>>>>>> privileged material for the sole use of the intended recipient. >> Any >>>>>>> review, copying, or distribution of this email (or any >> attachments) >>>>>>> by others is prohibited. If you are not the intended recipient, >>>>>>> please contact the sender immediately and permanently delete this >>>>>>> email and any attachments. No employee or agent of TiVo Inc. is >>>>>>> authorized to conclude any binding agreement on behalf of TiVo >> Inc. >>>>>>> by email. Binding agreements with TiVo Inc. may only be made by a >>>>>>> signed written agreement. >>>>>>> >>>>> -- >>>>> >>>>> >>>>> Tommy Becker >>>>> >>>>> Senior Software Engineer >>>>> >>>>> O +1 919.460.4747 >>>>> >>>>> tivo.com >>>>> >>>>> >>>>> ________________________________ >>>>> >>>>> This email and any attachments may contain confidential and >> privileged >>>>> material for the sole use of the intended recipient. Any review, >>> copying, >>>>> or distribution of this email (or any attachments) by others is >>>> prohibited. >>>>> If you are not the intended recipient, please contact the sender >>>>> immediately and permanently delete this email and any attachments. No >>>>> employee or agent of TiVo Inc. is authorized to conclude any binding >>>>> agreement on behalf of TiVo Inc. by email. Binding agreements with >> TiVo >>>>> Inc. may only be made by a signed written agreement. >>>>> >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature