I apologize for the longer email below. To my defense, it started out much shorter. :-) Also, to be super-clear, I am intentionally playing devil's advocate for a number of arguments brought forth in order to help improve this KIP -- I am not implying I necessarily disagree with the arguments.
That aside, here are some further thoughts. First, there are (at least?) two categories for actions/behavior you invoke via punctuate(): 1. For internal housekeeping of your Processor or Transformer (e.g., to periodically commit to a custom store, to do metrics/logging). Here, the impact of punctuate is typically not observable by other processing nodes in the topology. 2. For controlling the emit frequency of downstream records. Here, the punctuate is all about being observable by downstream processing nodes. A few releases back, we introduced record caches (DSL) and state store caches (Processor API) in KIP-63. Here, we addressed a concern relating to (2) where some users needed to control -- here: limit -- the downstream output rate of Kafka Streams because the downstream systems/apps would not be able to keep up with the upstream output rate (Kafka scalability > their scalability). The argument for KIP-63, which notably did not introduce a "trigger" API, was that such an interaction with downstream systems is an operational concern; it should not impact the processing *logic* of your application, and thus we didn't want to complicate the Kafka Streams API, especially not the declarative DSL, with such operational concerns. This KIP's discussion on `punctuate()` takes us back in time (<-- sorry, I couldn't resist to not make this pun :-P). As a meta-comment, I am observing that our conversation is moving more and more into the direction of explicit "triggers" because, so far, I have seen only motivations for use cases in category (2), but none yet for (1)? For example, some comments voiced here are about sth like "IF stream-time didn't trigger punctuate, THEN trigger punctuate based on processing-time". Do we want this, and if so, for which use cases and benefits? Also, on a related note, whatever we are discussing here will impact state store caches (Processor API) and perhaps also impact record caches (DSL), thus we should clarify any such impact here. Switching topics slightly. Jay wrote: > One thing I've always found super important for this kind of design work > is to do a really good job of cataloging the landscape of use cases and > how prevalent each one is. +1 to this, as others have already said. Here, let me highlight -- just in case -- that when we talked about windowing use cases in the recent emails, the Processor API (where `punctuate` resides) does not have any notion of windowing at all. If you want to do windowing *in the Processor API*, you must do so manually in combination with window stores. For this reason I'd suggest to discuss use cases not just in general, but also in view of how you'd do so in the Processor API vs. in the DSL. Right now, changing/improving `punctuate` does not impact the DSL at all, unless we add new functionality to it. Jay wrote in his strawman example: > You aggregate click and impression data for a reddit like site. Every ten > minutes you want to output a ranked list of the top 10 articles ranked by > clicks/impressions for each geographical area. I want to be able run this > in steady state as well as rerun to regenerate results (or catch up if it > crashes). This is a good example for more than the obvious reason: In KIP-63, we argued that the reason for saying "every ten minutes" above is not necessarily about because you want to output data *exactly* after ten minutes, but that you want to perform an aggregation based on 10-minute windows of input data; i.e., the point is about specifying the input for your aggregation, not or less about when the results of the aggregation should be send downstream. To take an extreme example, you could disable record caches and let your app output a downstream update for every incoming input record. If the last input record was from at minute 7 of 10 (for a 10-min window), then what your app would output at minute 10 would be identical to what it had already emitted at minute 7 earlier anyways. This is particularly true when we take late-arriving data into account: if a late record arrived at minute 13, your app would (by default) send a new update downstream, even though the "original" 10 minutes have already passed. Jay wrote...: > There are a couple of tricky things that seem to make this hard with either > of the options proposed: > 1. If I emit this data using event time I have the problem described where > a geographical region with no new clicks or impressions will fail to output > results. ...and Arun Mathew wrote: > We window by the event time, but trigger punctuate in <punctuate interval> > duration of system time, in the absence of an event crossing the punctuate > event time. So, given what I wrote above about the status quo and what you can already do with it, is the concern that the state store cache doesn't give you *direct* control over "forcing an output after no later than X seconds [of processing-time]" but only indirect control through a cache size? (Note that I am not dismissing the claims why this might be helpful.) Arun Mathew wrote: > We are using Kafka Stream for our Audit Trail, where we need to output the > event counts on each topic on each cluster aggregated over a 1 minute > window. We have to use event time to be able to cross check the counts. But > we need to trigger punctuate [aggregate event pushes] by system time in the > absence of events. Otherwise the event counts for unexpired windows would > be 0 which is bad. Isn't the latter -- "count would be 0" -- the problem between the absence of output vs. an output of 0, similar to the use of `Option[T]` in Scala and the difference between `None` and `Some(0)`? That is, isn't the root cause that the downstream system interprets the absence of output in a particular way ("No output after 1 minute = I consider the output to be 0.")? Arguably, you could also adapt the downstream system (if possible) to correctly handle the difference between absence of output vs. output of 0. I am not implying that we shouldn't care about such a use case, but want to understand the motivation better. :-) Also, to add some perspective, in some related discussions we talked about how a Kafka Streams application should not worry or not be coupled unnecessarily with such interpretation specifics in a downstream system's behavior. After all, tomorrow your app's output might be consumed by more than just this one downstream system. Arguably, Kafka Connect rather than Kafka Streams might be the best tool to link the universes of Kafka and downstream systems, including helping to reconcile the differences in how these systems interpret changes, updates, late-arriving data, etc. Kafka Connect would allow you to decouple the Kafka Streams app's logical processing from the specifics of downstream systems, thanks to specific sink connectors (e.g. for Elastic, Cassandra, MySQL, S3, etc). Would this decoupling with Kafka Connect help here? (And if the answer is "Yes, but it's currently awkward to use Connect for this", this might be a problem we can solve, too.) Switching topics slightly again. Thomas wrote: > I'm not entirely convinced that a separate callback (option C) > is that messy (it could just be a default method with an empty > implementation), but if we wanted a single API to handle both cases, > how about something like the following? > > enum Time { > STREAM, > CLOCK > } Yeah, I am on the fence here, too. If we use the 1-method approach, then whatever the user is doing inside this method is a black box to Kafka Streams (similar to how we have no idea what the user does inside a `foreach` -- if the function passed to `foreach` writes to external systems, then Kafka Streams is totally unaware of the fact). We won't know, for example, if the stream-time action has a smaller "trigger" frequency than the processing-time action. Or, we won't know whether the user custom-codes a "not later than" trigger logic ("Do X every 1-minute of stream-time or 1-minute of processing-time, whichever comes first"). That said, I am not certain yet whether we would need such knowledge because, when using the Processor API, most of the work and decisions must be done by the user anyways. It would matter though if the concept of "triggers" were to bubble up into the DSL because in the DSL the management of windowing, window stores, etc. must be done automatically by Kafka Streams. [In any case, btw, we have the corner case where the user configured the stream-time to be processing-time (e.g. via wall-clock timestamp extractor), at which point both punctuate variants are based on the same time semantics / timeline.] Again, I apologize for the wall of text. Congratulations if you made it this far. :-) More than happy to hear your thoughts! Michael On Tue, Apr 11, 2017 at 1:12 AM, Arun Mathew <arunmathe...@gmail.com> wrote: > Thanks Matthias. > Sure, will correct it right away. > > On 11-Apr-2017 8:07 AM, "Matthias J. Sax" <matth...@confluent.io> wrote: > > Thanks for preparing this page! > > About terminology: > > You introduce the term "event time" -- but we should call this "stream > time" -- "stream time" is whatever TimestampExtractor returns and this > could be event time, ingestion time, or processing/wall-clock time. > > Does this make sense to you? > > > > -Matthias > > > On 4/10/17 4:58 AM, Arun Mathew wrote: > > Thanks Ewen. > > > > @Michal, @all, I have created a child page to start the Use Cases > discussion [https://cwiki.apache.org/confluence/display/KAFKA/ > Punctuate+Use+Cases]. Please go through it and give your comments. > > > > @Tianji, Sorry for the delay. I am trying to make the patch public. > > > > -- > > Arun Mathew > > > > On 4/8/17, 02:00, "Ewen Cheslack-Postava" <e...@confluent.io> wrote: > > > > Arun, > > > > I've given you permission to edit the wiki. Let me know if you run > into any > > issues. > > > > -Ewen > > > > On Fri, Apr 7, 2017 at 1:21 AM, Arun Mathew <amat...@yahoo-corp.jp> > wrote: > > > > > Thanks Michal. I don’t have the access yet [arunmathew88]. Should I > be > > > sending a separate mail for this? > > > > > > I thought one of the person following this thread would be able to > give me > > > access. > > > > > > > > > > > > *From: *Michal Borowiecki <michal.borowie...@openbet.com> > > > *Reply-To: *"dev@kafka.apache.org" <dev@kafka.apache.org> > > > *Date: *Friday, April 7, 2017 at 17:16 > > > *To: *"dev@kafka.apache.org" <dev@kafka.apache.org> > > > *Subject: *Re: [DISCUSS] KIP-138: Change punctuate semantics > > > > > > > > > > > > Hi Arun, > > > > > > I was thinking along the same lines as you, listing the use cases > on the > > > wiki, but didn't find time to get around doing that yet. > > > Don't mind if you do it if you have access now. > > > I was thinking it would be nice if, once we have the use cases > listed, > > > people could use likes to up-vote the use cases similar to what > they're > > > working on. > > > > > > I should have a bit more time to action this in the next few days, > but > > > happy for you to do it if you can beat me to it ;-) > > > > > > Cheers, > > > Michal > > > > > > On 07/04/17 04:39, Arun Mathew wrote: > > > > > > Sure, Thanks Matthias. My id is [arunmathew88]. > > > > > > > > > > > > Of course. I was thinking of a subpage where people can > collaborate. > > > > > > > > > > > > Will do as per Michael’s suggestion. > > > > > > > > > > > > Regards, > > > > > > Arun Mathew > > > > > > > > > > > > On 4/7/17, 12:30, "Matthias J. Sax" <matth...@confluent.io> < > matth...@confluent.io> wrote: > > > > > > > > > > > > Please share your Wiki-ID and a committer can give you write > access. > > > > > > > > > > > > Btw: as you did not initiate the KIP, you should not change the > KIP > > > > > > without the permission of the original author -- in this case > Michael. > > > > > > > > > > > > So you might also just share your thought over the mailing list > and > > > > > > Michael can update the KIP page. Or, as an alternative, just > create a > > > > > > subpage for the KIP page. > > > > > > > > > > > > @Michael: WDYT? > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > On 4/6/17 8:05 PM, Arun Mathew wrote: > > > > > > > Hi Jay, > > > > > > > Thanks for the advise, I would like to list down > the use cases as > > > > > > > per your suggestion. But it seems I don't have write > permission to the > > > > > > > Apache Kafka Confluent Space. Whom shall I request for it? > > > > > > > > > > > > > > Regarding your last question. We are using a patch in our > production system > > > > > > > which does exactly this. > > > > > > > We window by the event time, but trigger punctuate in > <punctuate interval> > > > > > > > duration of system time, in the absence of an event crossing > the punctuate > > > > > > > event time. > > > > > > > > > > > > > > We are using Kafka Stream for our Audit Trail, where we need > to output the > > > > > > > event counts on each topic on each cluster aggregated over a > 1 minute > > > > > > > window. We have to use event time to be able to cross check > the counts. But > > > > > > > we need to trigger punctuate [aggregate event pushes] by > system time in the > > > > > > > absence of events. Otherwise the event counts for unexpired > windows would > > > > > > > be 0 which is bad. > > > > > > > > > > > > > > "Maybe a hybrid solution works: I window by event time but > trigger results > > > > > > > by system time for windows that have updated? Not really sure > the details > > > > > > > of making that work. Does that work? Are there concrete > examples where you > > > > > > > actually want the current behavior?" > > > > > > > > > > > > > > -- > > > > > > > With Regards, > > > > > > > > > > > > > > Arun Mathew > > > > > > > Yahoo! JAPAN Corporation > > > > > > > > > > > > > > On Wed, Apr 5, 2017 at 8:48 PM, Tianji Li < > skyah...@gmail.com> > <skyah...@gmail.com> wrote: > > > > > > > > > > > > > >> Hi Jay, > > > > > > >> > > > > > > >> The hybrid solution is exactly what I expect and need for > our use cases > > > > > > >> when dealing with telecom data. > > > > > > >> > > > > > > >> Thanks > > > > > > >> Tianji > > > > > > >> > > > > > > >> On Wed, Apr 5, 2017 at 12:01 AM, Jay Kreps < > j...@confluent.io> > <j...@confluent.io> wrote: > > > > > > >> > > > > > > >>> Hey guys, > > > > > > >>> > > > > > > >>> One thing I've always found super important for this kind > of design work > > > > > > >> is > > > > > > >>> to do a really good job of cataloging the landscape of use > cases and how > > > > > > >>> prevalent each one is. By that I mean not just listing lots > of uses, but > > > > > > >>> also grouping them into categories that functionally need > the same thing. > > > > > > >>> In the absence of this it is very hard to reason about > design proposals. > > > > > > >>> From the proposals so far I think we have a lot of > discussion around > > > > > > >>> possible apis, but less around what the user needs for > different use > > > > > > >> cases > > > > > > >>> and how they would implement that using the api. > > > > > > >>> > > > > > > >>> Here is an example: > > > > > > >>> You aggregate click and impression data for a reddit like > site. Every ten > > > > > > >>> minutes you want to output a ranked list of the top 10 > articles ranked by > > > > > > >>> clicks/impressions for each geographical area. I want to be > able run this > > > > > > >>> in steady state as well as rerun to regenerate results (or > catch up if it > > > > > > >>> crashes). > > > > > > >>> > > > > > > >>> There are a couple of tricky things that seem to make this > hard with > > > > > > >> either > > > > > > >>> of the options proposed: > > > > > > >>> 1. If I emit this data using event time I have the problem > described > > > > > > >> where > > > > > > >>> a geographical region with no new clicks or impressions > will fail to > > > > > > >> output > > > > > > >>> results. > > > > > > >>> 2. If I emit this data using system time I have the problem > that when > > > > > > >>> reprocessing data my window may not be ten minutes but 10 > hours if my > > > > > > >>> processing is very fast so it dramatically changes the > output. > > > > > > >>> > > > > > > >>> Maybe a hybrid solution works: I window by event time but > trigger results > > > > > > >>> by system time for windows that have updated? Not really > sure the details > > > > > > >>> of making that work. Does that work? Are there concrete > examples where > > > > > > >> you > > > > > > >>> actually want the current behavior? > > > > > > >>> > > > > > > >>> -Jay > > > > > > >>> > > > > > > >>> > > > > > > >>> On Tue, Apr 4, 2017 at 8:32 PM, Arun Mathew < > arunmathe...@gmail.com> <arunmathe...@gmail.com> > > > > > > >>> wrote: > > > > > > >>> > > > > > > >>>> Hi All, > > > > > > >>>> > > > > > > >>>> Thanks for the KIP. We were also in need of a mechanism to > trigger > > > > > > >>>> punctuate in the absence of events. > > > > > > >>>> > > > > > > >>>> As I described in [ > > > > > > >>>> https://issues.apache.org/jira/browse/KAFKA-3514? > > > > > > >>>> focusedCommentId=15926036&page=com.atlassian.jira. > > > > > > >>>> plugin.system.issuetabpanels:comment-tabpanel#comment- > 15926036 > > > > > > >>>> ], > > > > > > >>>> > > > > > > >>>> - Our approached involved using the event time by > default. > > > > > > >>>> - The method to check if there is any punctuate ready > in the > > > > > > >>>> PunctuationQueue is triggered via the any event > received by the > > > > > > >> stream > > > > > > >>>> tread, or at the polling intervals in the absence of > any events. > > > > > > >>>> - When we create Punctuate objects (which contains the > next event > > > > > > >> time > > > > > > >>>> for punctuation and interval), we also record the > creation time > > > > > > >>> (system > > > > > > >>>> time). > > > > > > >>>> - While checking for maturity of Punctuate Schedule by > > > > > > >> mayBePunctuate > > > > > > >>>> method, we also check if the system clock has elapsed > the punctuate > > > > > > >>>> interval since the schedule creation time. > > > > > > >>>> - In the absence of any event, or in the absence of any > event for > > > > > > >> one > > > > > > >>>> topic in the partition group assigned to the stream > task, the system > > > > > > >>>> time > > > > > > >>>> will elapse the interval and we trigger a punctuate > using the > > > > > > >> expected > > > > > > >>>> punctuation event time. > > > > > > >>>> - we then create the next punctuation schedule as > punctuation event > > > > > > >>> time > > > > > > >>>> + punctuation interval, [again recording the system > time of creation > > > > > > >>> of > > > > > > >>>> the > > > > > > >>>> schedule]. > > > > > > >>>> > > > > > > >>>> We call this a Hybrid Punctuate. Of course, this approach > has pros and > > > > > > >>>> cons. > > > > > > >>>> Pros > > > > > > >>>> > > > > > > >>>> - Punctuates will happen in <punctuate interval> time > duration at > > > > > > >> max > > > > > > >>> in > > > > > > >>>> terms of system time. > > > > > > >>>> - The semantics as a whole continues to revolve around > event time. > > > > > > >>>> - We can use the old data [old timestamps] to rerun any > experiments > > > > > > >> or > > > > > > >>>> tests. > > > > > > >>>> > > > > > > >>>> Cons > > > > > > >>>> > > > > > > >>>> - In case the <punctuate interval> is not a time > duration [say > > > > > > >>> logical > > > > > > >>>> time/event count], then the approach might not be > meaningful. > > > > > > >>>> - In case there is a case where we have to wait for an > actual event > > > > > > >>> from > > > > > > >>>> a low event rate partition in the partition group, this > approach > > > > > > >> will > > > > > > >>>> jump > > > > > > >>>> the gun. > > > > > > >>>> - in case the event processing cannot catch up with the > event rate > > > > > > >> and > > > > > > >>>> the expected timestamp events gets queued for long > time, this > > > > > > >> approach > > > > > > >>>> might jump the gun. > > > > > > >>>> > > > > > > >>>> I believe the above approach and discussion goes close to > the approach > > > > > > >> A. > > > > > > >>>> > > > > > > >>>> ----------- > > > > > > >>>> > > > > > > >>>> I like the idea of having an even count based punctuate. > > > > > > >>>> > > > > > > >>>> ----------- > > > > > > >>>> > > > > > > >>>> I agree with the discussion around approach C, that we > should provide > > > > > > >> the > > > > > > >>>> user with the option to choose system time or event time > based > > > > > > >>> punctuates. > > > > > > >>>> But I believe that the user predominantly wants to use > event time while > > > > > > >>> not > > > > > > >>>> missing out on regular punctuates due to event delays or > event > > > > > > >> absences. > > > > > > >>>> Hence a complex punctuate option as Matthias mentioned > (quoted below) > > > > > > >>> would > > > > > > >>>> be most apt. > > > > > > >>>> > > > > > > >>>> "- We might want to add "complex" schedules later on > (like, punctuate > > > > > > >> on > > > > > > >>>> every 10 seconds event-time or 60 seconds system-time > whatever comes > > > > > > >>>> first)." > > > > > > >>>> > > > > > > >>>> ----------- > > > > > > >>>> > > > > > > >>>> I think I read somewhere that Kafka Streams started with > System Time as > > > > > > >>> the > > > > > > >>>> punctuation standard, but was later changed to Event Time. > I guess > > > > > > >> there > > > > > > >>>> would be some good reason behind it. As Kafka Streams want > to evolve > > > > > > >> more > > > > > > >>>> on the Stream Processing front, I believe the emphasis on > event time > > > > > > >>> would > > > > > > >>>> remain quite strong. > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> With Regards, > > > > > > >>>> > > > > > > >>>> Arun Mathew > > > > > > >>>> Yahoo! JAPAN Corporation, Tokyo > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> On Wed, Apr 5, 2017 at 3:53 AM, Thomas Becker < > tobec...@tivo.com> <tobec...@tivo.com> > > > > > > >> wrote: > > > > > > >>>> > > > > > > >>>>> Yeah I like PuncutationType much better; I just threw > Time out there > > > > > > >>>>> more as a strawman than an actual suggestion ;) I still > think it's > > > > > > >>>>> worth considering what this buys us over an additional > callback. I > > > > > > >>>>> foresee a number of punctuate implementations following > this pattern: > > > > > > >>>>> > > > > > > >>>>> public void punctuate(PunctuationType type) { > > > > > > >>>>> switch (type) { > > > > > > >>>>> case EVENT_TIME: > > > > > > >>>>> methodA(); > > > > > > >>>>> break; > > > > > > >>>>> case SYSTEM_TIME: > > > > > > >>>>> methodB(); > > > > > > >>>>> break; > > > > > > >>>>> } > > > > > > >>>>> } > > > > > > >>>>> > > > > > > >>>>> I guess one advantage of this approach is we could add > additional > > > > > > >>>>> punctuation types later in a backwards compatible way > (like event > > > > > > >> count > > > > > > >>>>> as you mentioned). > > > > > > >>>>> > > > > > > >>>>> -Tommy > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> On Tue, 2017-04-04 at 11:10 -0700, Matthias J. Sax wrote: > > > > > > >>>>>> That sounds promising. > > > > > > >>>>>> > > > > > > >>>>>> I am just wondering if `Time` is the best name. Maybe we > want to > > > > > > >> add > > > > > > >>>>>> other non-time based punctuations at some point later. I > would > > > > > > >>>>>> suggest > > > > > > >>>>>> > > > > > > >>>>>> enum PunctuationType { > > > > > > >>>>>> EVENT_TIME, > > > > > > >>>>>> SYSTEM_TIME, > > > > > > >>>>>> } > > > > > > >>>>>> > > > > > > >>>>>> or similar. Just to keep the door open -- it's easier to > add new > > > > > > >>>>>> stuff > > > > > > >>>>>> if the name is more generic. > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>> -Matthias > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>> On 4/4/17 5:30 AM, Thomas Becker wrote: > > > > > > >>>>>>> > > > > > > >>>>>>> I agree that the framework providing and managing the > notion of > > > > > > >>>>>>> stream > > > > > > >>>>>>> time is valuable and not something we would want to > delegate to > > > > > > >> the > > > > > > >>>>>>> tasks. I'm not entirely convinced that a separate > callback > > > > > > >> (option > > > > > > >>>>>>> C) > > > > > > >>>>>>> is that messy (it could just be a default method with > an empty > > > > > > >>>>>>> implementation), but if we wanted a single API to > handle both > > > > > > >>>>>>> cases, > > > > > > >>>>>>> how about something like the following? > > > > > > >>>>>>> > > > > > > >>>>>>> enum Time { > > > > > > >>>>>>> STREAM, > > > > > > >>>>>>> CLOCK > > > > > > >>>>>>> } > > > > > > >>>>>>> > > > > > > >>>>>>> Then on ProcessorContext: > > > > > > >>>>>>> context.schedule(Time time, long interval) // We could > allow > > > > > > >> this > > > > > > >>>>>>> to > > > > > > >>>>>>> be called once for each value of time to mix > approaches. > > > > > > >>>>>>> > > > > > > >>>>>>> Then the Processor API becomes: > > > > > > >>>>>>> punctuate(Time time) // time here denotes which > schedule resulted > > > > > > >>>>>>> in > > > > > > >>>>>>> this call. > > > > > > >>>>>>> > > > > > > >>>>>>> Thoughts? > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> On Mon, 2017-04-03 at 22:44 -0700, Matthias J. Sax > wrote: > > > > > > >>>>>>>> > > > > > > >>>>>>>> Thanks a lot for the KIP Michal, > > > > > > >>>>>>>> > > > > > > >>>>>>>> I was thinking about the four options you proposed in > more > > > > > > >>>>>>>> details > > > > > > >>>>>>>> and > > > > > > >>>>>>>> this are my thoughts: > > > > > > >>>>>>>> > > > > > > >>>>>>>> (A) You argue, that users can still "punctuate" on > event-time > > > > > > >> via > > > > > > >>>>>>>> process(), but I am not sure if this is possible. > Note, that > > > > > > >>>>>>>> users > > > > > > >>>>>>>> only > > > > > > >>>>>>>> get record timestamps via context.timestamp(). Thus, > users > > > > > > >> would > > > > > > >>>>>>>> need > > > > > > >>>>>>>> to > > > > > > >>>>>>>> track the time progress per partition (based on the > partitions > > > > > > >>>>>>>> they > > > > > > >>>>>>>> obverse via context.partition(). (This alone puts a > huge burden > > > > > > >>>>>>>> on > > > > > > >>>>>>>> the > > > > > > >>>>>>>> user by itself.) However, users are not notified at > startup > > > > > > >> what > > > > > > >>>>>>>> partitions are assigned, and user are not notified > when > > > > > > >>>>>>>> partitions > > > > > > >>>>>>>> get > > > > > > >>>>>>>> revoked. Because this information is not available, > it's not > > > > > > >>>>>>>> possible > > > > > > >>>>>>>> to > > > > > > >>>>>>>> "manually advance" stream-time, and thus event-time > punctuation > > > > > > >>>>>>>> within > > > > > > >>>>>>>> process() seems not to be possible -- or do you see a > way to > > > > > > >> get > > > > > > >>>>>>>> it > > > > > > >>>>>>>> done? And even if, it might still be too clumsy to > use. > > > > > > >>>>>>>> > > > > > > >>>>>>>> (B) This does not allow to mix both approaches, thus > limiting > > > > > > >>>>>>>> what > > > > > > >>>>>>>> users > > > > > > >>>>>>>> can do. > > > > > > >>>>>>>> > > > > > > >>>>>>>> (C) This should give all flexibility we need. However, > just > > > > > > >>>>>>>> adding > > > > > > >>>>>>>> one > > > > > > >>>>>>>> more method seems to be a solution that is too simple > (cf my > > > > > > >>>>>>>> comments > > > > > > >>>>>>>> below). > > > > > > >>>>>>>> > > > > > > >>>>>>>> (D) This might be hard to use. Also, I am not sure how > a user > > > > > > >>>>>>>> could > > > > > > >>>>>>>> enable system-time and event-time punctuation in > parallel. > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> Overall options (C) seems to be the most promising > approach to > > > > > > >>>>>>>> me. > > > > > > >>>>>>>> Because I also favor a clean API, we might keep > current > > > > > > >>>>>>>> punctuate() > > > > > > >>>>>>>> as-is, but deprecate it -- so we can remove it at some > later > > > > > > >>>>>>>> point > > > > > > >>>>>>>> when > > > > > > >>>>>>>> people use the "new punctuate API". > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> Couple of follow up questions: > > > > > > >>>>>>>> > > > > > > >>>>>>>> - I am wondering, if we should have two callback > methods or > > > > > > >> just > > > > > > >>>>>>>> one > > > > > > >>>>>>>> (ie, a unified for system and event time punctuation > or one for > > > > > > >>>>>>>> each?). > > > > > > >>>>>>>> > > > > > > >>>>>>>> - If we have one, how can the user figure out, which > condition > > > > > > >>>>>>>> did > > > > > > >>>>>>>> trigger? > > > > > > >>>>>>>> > > > > > > >>>>>>>> - How would the API look like, for registering > different > > > > > > >>>>>>>> punctuate > > > > > > >>>>>>>> schedules? The "type" must be somehow defined? > > > > > > >>>>>>>> > > > > > > >>>>>>>> - We might want to add "complex" schedules later on > (like, > > > > > > >>>>>>>> punctuate > > > > > > >>>>>>>> on > > > > > > >>>>>>>> every 10 seconds event-time or 60 seconds system-time > whatever > > > > > > >>>>>>>> comes > > > > > > >>>>>>>> first). I don't say we should add this right away, but > we might > > > > > > >>>>>>>> want > > > > > > >>>>>>>> to > > > > > > >>>>>>>> define the API in a way, that it allows extensions > like this > > > > > > >>>>>>>> later > > > > > > >>>>>>>> on, > > > > > > >>>>>>>> without redesigning the API (ie, the API should be > designed > > > > > > >>>>>>>> extensible) > > > > > > >>>>>>>> > > > > > > >>>>>>>> - Did you ever consider count-based punctuation? > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> I understand, that you would like to solve a simple > problem, > > > > > > >> but > > > > > > >>>>>>>> we > > > > > > >>>>>>>> learned from the past, that just "adding some API" > quickly > > > > > > >> leads > > > > > > >>>>>>>> to a > > > > > > >>>>>>>> not very well defined API that needs time consuming > clean up > > > > > > >>>>>>>> later on > > > > > > >>>>>>>> via other KIPs. Thus, I would prefer to get a holistic > > > > > > >>>>>>>> punctuation > > > > > > >>>>>>>> KIP > > > > > > >>>>>>>> with this from the beginning on to avoid later painful > > > > > > >> redesign. > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> -Matthias > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> > > > > > > >>>>>>>> On 4/3/17 11:58 AM, Michal Borowiecki wrote: > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> Thanks Thomas, > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> I'm also wary of changing the existing semantics of > > > > > > >> punctuate, > > > > > > >>>>>>>>> for > > > > > > >>>>>>>>> backward compatibility reasons, although I like the > > > > > > >> conceptual > > > > > > >>>>>>>>> simplicity of that option. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> Adding a new method to me feels safer but, in a way, > uglier. > > > > > > >> I > > > > > > >>>>>>>>> added > > > > > > >>>>>>>>> this to the KIP now as option (C). > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> The TimestampExtractor mechanism is actually more > flexible, > > > > > > >> as > > > > > > >>>>>>>>> it > > > > > > >>>>>>>>> allows > > > > > > >>>>>>>>> you to return any value, you're not limited to event > time or > > > > > > >>>>>>>>> system > > > > > > >>>>>>>>> time > > > > > > >>>>>>>>> (although I don't see an actual use case where you > might need > > > > > > >>>>>>>>> anything > > > > > > >>>>>>>>> else then those two). Hence I also proposed the > option to > > > > > > >> allow > > > > > > >>>>>>>>> users > > > > > > >>>>>>>>> to, effectively, decide what "stream time" is for > them given > > > > > > >>>>>>>>> the > > > > > > >>>>>>>>> presence or absence of messages, much like they can > decide > > > > > > >> what > > > > > > >>>>>>>>> msg > > > > > > >>>>>>>>> time > > > > > > >>>>>>>>> means for them using the TimestampExtractor. What do > you > > > > > > >> think > > > > > > >>>>>>>>> about > > > > > > >>>>>>>>> that? This is probably most flexible but also most > > > > > > >> complicated. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> All comments appreciated. > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> Cheers, > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> Michal > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> > > > > > > >>>>>>>>> On 03/04/17 19:23, Thomas Becker wrote: > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> Although I fully agree we need a way to trigger > periodic > > > > > > >>>>>>>>>> processing > > > > > > >>>>>>>>>> that is independent from whether and when messages > arrive, > > > > > > >>>>>>>>>> I'm > > > > > > >>>>>>>>>> not sure > > > > > > >>>>>>>>>> I like the idea of changing the existing semantics > across > > > > > > >> the > > > > > > >>>>>>>>>> board. > > > > > > >>>>>>>>>> What if we added an additional callback to Processor > that > > > > > > >> can > > > > > > >>>>>>>>>> be > > > > > > >>>>>>>>>> scheduled similarly to punctuate() but was always > called at > > > > > > >>>>>>>>>> fixed, wall > > > > > > >>>>>>>>>> clock based intervals? This way you wouldn't have to > give > > > > > > >> up > > > > > > >>>>>>>>>> the > > > > > > >>>>>>>>>> notion > > > > > > >>>>>>>>>> of stream time to be able to do periodic processing. > > > > > > >>>>>>>>>> > > > > > > >>>>>>>>>> On Mon, 2017-04-03 at 10:34 +0100, Michal Borowiecki > wrote: > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> Hi all, > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> I have created a draft for KIP-138: Change > punctuate > > > > > > >>>>>>>>>>> semantics > > > > > > >>>>>>>>>>> <https://cwiki.apache.org/ > confluence/display/KAFKA/KIP- <https://cwiki.apache.org/ > confluence/display/KAFKA/KIP-> > > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP->>> > 138% > > > > > > >>>>>>>>>>> 3A+C > > > > > > >>>>>>>>>>> hange+ > > > > > > >>>>>>>>>>> punctuate+semantics> > > > > > > >>>>>>>>>>> . > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> Appreciating there can be different views on > system-time > > > > > > >> vs > > > > > > >>>>>>>>>>> event- > > > > > > >>>>>>>>>>> time > > > > > > >>>>>>>>>>> semantics for punctuation depending on use-case and > the > > > > > > >>>>>>>>>>> importance of > > > > > > >>>>>>>>>>> backwards compatibility of any such change, I've > left it > > > > > > >>>>>>>>>>> quite > > > > > > >>>>>>>>>>> open > > > > > > >>>>>>>>>>> and > > > > > > >>>>>>>>>>> hope to fill in more info as the discussion > progresses. > > > > > > >>>>>>>>>>> > > > > > > >>>>>>>>>>> Thanks, > > > > > > >>>>>>>>>>> Michal > > > > > > >>>>>>> -- > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> Tommy Becker > > > > > > >>>>>>> > > > > > > >>>>>>> Senior Software Engineer > > > > > > >>>>>>> > > > > > > >>>>>>> O +1 919.460.4747 <(919)%20460-4747> > > > > > > >>>>>>> > > > > > > >>>>>>> tivo.com > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> ________________________________ > > > > > > >>>>>>> > > > > > > >>>>>>> This email and any attachments may contain confidential > and > > > > > > >>>>>>> privileged material for the sole use of the intended > recipient. > > > > > > >> Any > > > > > > >>>>>>> review, copying, or distribution of this email (or any > > > > > > >> attachments) > > > > > > >>>>>>> by others is prohibited. If you are not the intended > recipient, > > > > > > >>>>>>> please contact the sender immediately and permanently > delete this > > > > > > >>>>>>> email and any attachments. No employee or agent of TiVo > Inc. is > > > > > > >>>>>>> authorized to conclude any binding agreement on behalf > of TiVo > > > > > > >> Inc. > > > > > > >>>>>>> by email. Binding agreements with TiVo Inc. may only be > made by a > > > > > > >>>>>>> signed written agreement. > > > > > > >>>>>>> > > > > > > >>>>> -- > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> Tommy Becker > > > > > > >>>>> > > > > > > >>>>> Senior Software Engineer > > > > > > >>>>> > > > > > > >>>>> O +1 919.460.4747 <(919)%20460-4747> > > > > > > >>>>> > > > > > > >>>>> tivo.com > > > > > > >>>>> > > > > > > >>>>> > > > > > > >>>>> ________________________________ > > > > > > >>>>> > > > > > > >>>>> This email and any attachments may contain confidential > and > > > > > > >> privileged > > > > > > >>>>> material for the sole use of the intended recipient. Any > review, > > > > > > >>> copying, > > > > > > >>>>> or distribution of this email (or any attachments) by > others is > > > > > > >>>> prohibited. > > > > > > >>>>> If you are not the intended recipient, please contact the > sender > > > > > > >>>>> immediately and permanently delete this email and any > attachments. No > > > > > > >>>>> employee or agent of TiVo Inc. is authorized to conclude > any binding > > > > > > >>>>> agreement on behalf of TiVo Inc. by email. Binding > agreements with > > > > > > >> TiVo > > > > > > >>>>> Inc. may only be made by a signed written agreement. > > > > > > >>>>> > > > > > > >>>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > <http://www.openbet.com/> > > > > > > *Michal Borowiecki* > > > > > > *Senior Software Engineer L4* > > > > > > *T: * > > > > > > +44 208 742 1600 <+44%2020%208742%201600> > > > > > > +44 203 249 8448 <+44%2020%203249%208448> > > > > > > > > > > > > *E: * > > > > > > michal.borowie...@openbet.com > > > > > > *W: * > > > > > > www.openbet.com > > > > > > *OpenBet Ltd* > > > > > > Chiswick Park Building 9 > > > > > > 566 Chiswick High Rd > > > > > > London > > > > > > W4 5XT > > > > > > UK > > > > > > <https://www.openbet.com/email_promo> > > > > > > > > > > > > This message is confidential and intended only for the addressee. > If you > > > have received this message in error, please immediately notify the > > > postmas...@openbet.com and delete it from your system as well as > any > > > copies. The content of e-mails as well as traffic data may be > monitored by > > > OpenBet for employment and security purposes. To protect the > environment > > > please do not print this e-mail unless necessary. OpenBet Ltd. > Registered > > > Office: Chiswick Park Building 9, 566 Chiswick High Road, London, > W4 5XT, > > > United Kingdom. A company registered in England and Wales. > Registered no. > > > 3134634. VAT no. GB927523612 > > > > > > > > > > > > > >