Hi Michal, Thanks for the KIP. I'd like to propose a bit more of a radical change to the API. 1. deprecate the punctuate method on Processor 2. create a new Functional Interface just for Punctuation, something like: interface Punctuator { void punctuate(long timestamp) } 3. add a new schedule function to ProcessorContext: schedule(long interval, PunctuationType type, Punctuator callback) 4. deprecate the existing schedule function
Thoughts? Thanks, Damian On Sun, 16 Apr 2017 at 21:55 Michal Borowiecki < michal.borowie...@openbet.com> wrote: > Hi Thomas, > > I would say our use cases fall in the same category as yours. > > 1) One is expiry of old records, it's virtually identical to yours. > > 2) Second one is somewhat more convoluted but boils down to the same type > of design: > > Incoming messages carry a number of fields, including a timestamp. > > Outgoing messages contain derived fields, one of them (X) is depended on > by the timestamp input field (Y) and some other input field (Z). > > Since the output field X is derived in some non-trivial way, we don't want > to force the logic onto downstream apps. Instead we want to calculate it in > the Kafka Streams app, which means we re-calculate X as soon as the > timestamp in Y is reached (wall clock time) and send a message if it > changed (I say "if" because the derived field (X) is also conditional on > another input field Z). > > So we have kv stores with the records and an additional kv store with > timestamp->id mapping which act like an index where we periodically do a > ranged query. > > Initially we naively tried doing it in punctuate which of course didn't > work when there were no regular msgs on the input topic. > Since this was before 0.10.1 and state stores weren't query-able from > outside we created a "ticker" that produced msgs once per second onto > another topic and fed it into the same topology to trigger punctuate. > This didn't work either, which was much more surprising to us at the time, > because it was not obvious at all that punctuate is only triggered if > *all* input partitions receive messages regularly. > In the end we had to break this into 2 separate Kafka Streams. Main > transformer doesn't use punctuate but sends values of timestamp field Y and > the id to a "scheduler" topic where also the periodic ticks are sent. This > is consumed by the second topology and is its only input topic. There's a > transformer on that topic which populates and updates the time-based > indexes and polls them from punctuate. If the time in the timestamp > elapsed, the record id is sent to the main transformer, which > updates/deletes the record from the main kv store and forwards the > transformed record to the output topic. > > To me this setup feels horrendously complicated for what it does. > > We could incrementally improve on this since 0.10.1 to poll the > timestamp->id "index" stores from some code outside the KafkaStreams > topology so that at least we wouldn't need the extra topic for "ticks". > However, the ticks don't feel so hacky when you realise they give you some > hypothetical benefits in predictability. You can reprocess the messages in > a reproducible manner, since the topologies use event-time, just that the > event time is simply the wall-clock time fed into a topic by the ticks. (NB > in our use case we haven't yet found a need for this kind of reprocessing). > To make that work though, we would have to have the stream time advance > based on the presence of msgs on the "tick" topic, regardless of the > presence of messages on the other input topic. > > Same as in the expiry use case, both the wall-clock triggered punctuate > and the hybrid would work to simplify this a lot. > > 3) Finally, I have a 3rd use case in the making but I'm still looking if > we can achieve it using session windows instead. I'll keep you posted if we > have to go with punctuate there too. > > Thanks, > Michal > > > On 11/04/17 20:52, Thomas Becker wrote: > > Here's an example that we currently have. We have a streams processor > that does a transform from one topic into another. One of the fields in > the source topic record is an expiration time, and one of the functions > of the processor is to ensure that expired records get deleted promptly > after that time passes (typically days or weeks after the message was > originally produced). To do that, the processor keeps a state store of > keys and expiration times, iterates that store in punctuate(), and > emits delete (null) records for expired items. This needs to happen at > some minimum interval regardless of the incoming message rate of the > source topic. > > In this scenario, the expiration of records is the primary function of > punctuate, and therefore the key requirement is that the wall-clock > measured time between punctuate calls have some upper-bound. So a pure > wall-clock based schedule would be fine for our needs. But the proposed > "hybrid" system would also be acceptable if that satisfies a broader > range of use-cases. > > On Tue, 2017-04-11 at 14:41 +0200, Michael Noll wrote: > > I apologize for the longer email below. To my defense, it started > out much > shorter. :-) Also, to be super-clear, I am intentionally playing > devil's > advocate for a number of arguments brought forth in order to help > improve > this KIP -- I am not implying I necessarily disagree with the > arguments. > > That aside, here are some further thoughts. > > First, there are (at least?) two categories for actions/behavior you > invoke > via punctuate(): > > 1. For internal housekeeping of your Processor or Transformer (e.g., > to > periodically commit to a custom store, to do metrics/logging). Here, > the > impact of punctuate is typically not observable by other processing > nodes > in the topology. > 2. For controlling the emit frequency of downstream records. Here, > the > punctuate is all about being observable by downstream processing > nodes. > > A few releases back, we introduced record caches (DSL) and state > store > caches (Processor API) in KIP-63. Here, we addressed a concern > relating to > (2) where some users needed to control -- here: limit -- the > downstream > output rate of Kafka Streams because the downstream systems/apps > would not > be able to keep up with the upstream output rate (Kafka scalability > > their > scalability). The argument for KIP-63, which notably did not > introduce a > "trigger" API, was that such an interaction with downstream systems > is an > operational concern; it should not impact the processing *logic* of > your > application, and thus we didn't want to complicate the Kafka Streams > API, > especially not the declarative DSL, with such operational concerns. > > This KIP's discussion on `punctuate()` takes us back in time (<-- > sorry, I > couldn't resist to not make this pun :-P). As a meta-comment, I am > observing that our conversation is moving more and more into the > direction > of explicit "triggers" because, so far, I have seen only motivations > for > use cases in category (2), but none yet for (1)? For example, some > comments voiced here are about sth like "IF stream-time didn't > trigger > punctuate, THEN trigger punctuate based on processing-time". Do we > want > this, and if so, for which use cases and benefits? Also, on a > related > note, whatever we are discussing here will impact state store caches > (Processor API) and perhaps also impact record caches (DSL), thus we > should > clarify any such impact here. > > Switching topics slightly. > > Jay wrote: > > > One thing I've always found super important for this kind of design > work > is to do a really good job of cataloging the landscape of use cases > and > how prevalent each one is. > > +1 to this, as others have already said. > > Here, let me highlight -- just in case -- that when we talked about > windowing use cases in the recent emails, the Processor API (where > `punctuate` resides) does not have any notion of windowing at > all. If you > want to do windowing *in the Processor API*, you must do so manually > in > combination with window stores. For this reason I'd suggest to > discuss use > cases not just in general, but also in view of how you'd do so in the > Processor API vs. in the DSL. Right now, changing/improving > `punctuate` > does not impact the DSL at all, unless we add new functionality to > it. > > Jay wrote in his strawman example: > > > You aggregate click and impression data for a reddit like site. > Every ten > minutes you want to output a ranked list of the top 10 articles > ranked by > clicks/impressions for each geographical area. I want to be able > run this > in steady state as well as rerun to regenerate results (or catch up > if it > crashes). > > This is a good example for more than the obvious reason: In KIP-63, > we > argued that the reason for saying "every ten minutes" above is not > necessarily about because you want to output data *exactly* after ten > minutes, but that you want to perform an aggregation based on 10- > minute > windows of input data; i.e., the point is about specifying the input > for > your aggregation, not or less about when the results of the > aggregation > should be send downstream. To take an extreme example, you could > disable > record caches and let your app output a downstream update for every > incoming input record. If the last input record was from at minute 7 > of 10 > (for a 10-min window), then what your app would output at minute 10 > would > be identical to what it had already emitted at minute 7 earlier > anyways. > This is particularly true when we take late-arriving data into > account: if > a late record arrived at minute 13, your app would (by default) send > a new > update downstream, even though the "original" 10 minutes have already > passed. > > Jay wrote...: > > > There are a couple of tricky things that seem to make this hard > with > > either > > > of the options proposed: > 1. If I emit this data using event time I have the problem > described where > a geographical region with no new clicks or impressions will fail > to > > output > > > results. > > ...and Arun Mathew wrote: > > > > We window by the event time, but trigger punctuate in <punctuate > interval> > duration of system time, in the absence of an event crossing the > punctuate > event time. > > So, given what I wrote above about the status quo and what you can > already > do with it, is the concern that the state store cache doesn't give > you > *direct* control over "forcing an output after no later than X > seconds [of > processing-time]" but only indirect control through a cache > size? (Note > that I am not dismissing the claims why this might be helpful.) > > Arun Mathew wrote: > > > We are using Kafka Stream for our Audit Trail, where we need to > output the > event counts on each topic on each cluster aggregated over a 1 > minute > window. We have to use event time to be able to cross check the > counts. > > But > > > we need to trigger punctuate [aggregate event pushes] by system > time in > > the > > > absence of events. Otherwise the event counts for unexpired windows > would > be 0 which is bad. > > Isn't the latter -- "count would be 0" -- the problem between the > absence > of output vs. an output of 0, similar to the use of `Option[T]` in > Scala > and the difference between `None` and `Some(0)`? That is, isn't the > root > cause that the downstream system interprets the absence of output in > a > particular way ("No output after 1 minute = I consider the output to > be > 0.")? Arguably, you could also adapt the downstream system (if > possible) > to correctly handle the difference between absence of output vs. > output of > 0. I am not implying that we shouldn't care about such a use case, > but > want to understand the motivation better. :-) > > Also, to add some perspective, in some related discussions we talked > about > how a Kafka Streams application should not worry or not be coupled > unnecessarily with such interpretation specifics in a downstream > system's > behavior. After all, tomorrow your app's output might be consumed by > more > than just this one downstream system. Arguably, Kafka Connect rather > than > Kafka Streams might be the best tool to link the universes of Kafka > and > downstream systems, including helping to reconcile the differences in > how > these systems interpret changes, updates, late-arriving data, > etc. Kafka > Connect would allow you to decouple the Kafka Streams app's logical > processing from the specifics of downstream systems, thanks to > specific > sink connectors (e.g. for Elastic, Cassandra, MySQL, S3, etc). Would > this > decoupling with Kafka Connect help here? (And if the answer is "Yes, > but > it's currently awkward to use Connect for this", this might be a > problem we > can solve, too.) > > Switching topics slightly again. > > Thomas wrote: > > > I'm not entirely convinced that a separate callback (option C) > is that messy (it could just be a default method with an empty > implementation), but if we wanted a single API to handle both > cases, > how about something like the following? > > enum Time { > STREAM, > CLOCK > } > > Yeah, I am on the fence here, too. If we use the 1-method approach, > then > whatever the user is doing inside this method is a black box to Kafka > Streams (similar to how we have no idea what the user does inside a > `foreach` -- if the function passed to `foreach` writes to external > systems, then Kafka Streams is totally unaware of the fact). We > won't > know, for example, if the stream-time action has a smaller "trigger" > frequency than the processing-time action. Or, we won't know whether > the > user custom-codes a "not later than" trigger logic ("Do X every 1- > minute of > stream-time or 1-minute of processing-time, whichever comes > first"). That > said, I am not certain yet whether we would need such knowledge > because, > when using the Processor API, most of the work and decisions must be > done > by the user anyways. It would matter though if the concept of > "triggers" > were to bubble up into the DSL because in the DSL the management of > windowing, window stores, etc. must be done automatically by Kafka > Streams. > > [In any case, btw, we have the corner case where the user configured > the > stream-time to be processing-time (e.g. via wall-clock timestamp > extractor), at which point both punctuate variants are based on the > same > time semantics / timeline.] > > Again, I apologize for the wall of text. Congratulations if you made > it > this far. :-) > > More than happy to hear your thoughts! > Michael > > On Tue, Apr 11, 2017 at 1:12 AM, Arun Mathew <arunmathe...@gmail.com> > <arunmathe...@gmail.com> > wrote: > > > > Thanks Matthias. > Sure, will correct it right away. > > On 11-Apr-2017 8:07 AM, "Matthias J. Sax" <matth...@confluent.io> > <matth...@confluent.io> > wrote: > > Thanks for preparing this page! > > About terminology: > > You introduce the term "event time" -- but we should call this > "stream > time" -- "stream time" is whatever TimestampExtractor returns and > this > could be event time, ingestion time, or processing/wall-clock time. > > Does this make sense to you? > > > > -Matthias > > > On 4/10/17 4:58 AM, Arun Mathew wrote: > > > Thanks Ewen. > > @Michal, @all, I have created a child page to start the Use Cases > > discussion [https://cwiki.apache.org/confluence/display/KAFKA/ > Punctuate+Use+Cases]. Please go through it and give your comments. > > > > @Tianji, Sorry for the delay. I am trying to make the patch > public. > > -- > Arun Mathew > > On 4/8/17, 02:00, "Ewen Cheslack-Postava" <e...@confluent.io> > <e...@confluent.io> > wrote: > > Arun, > > I've given you permission to edit the wiki. Let me know if > you run > > into any > > > issues. > > -Ewen > > On Fri, Apr 7, 2017 at 1:21 AM, Arun Mathew <amathew@yahoo-co > rp.jp> <amat...@yahoo-corp.jp> > > wrote: > > > > > Thanks Michal. I don’t have the access yet [arunmathew88]. > Should I > > be > > > > sending a separate mail for this? > > > > I thought one of the person following this thread would be > able to > > give me > > > > access. > > > > > > > > *From: *Michal Borowiecki <michal.borowie...@openbet.com> > <michal.borowie...@openbet.com> > > *Reply-To: *"dev@kafka.apache.org" <dev@kafka.apache.org> > <dev@kafka.apache.org> <dev@kafka.apache.org> > > *Date: *Friday, April 7, 2017 at 17:16 > > *To: *"dev@kafka.apache.org" <dev@kafka.apache.org> > <dev@kafka.apache.org> <dev@kafka.apache.org> > > *Subject: *Re: [DISCUSS] KIP-138: Change punctuate > semantics > > > > > > > > Hi Arun, > > > > I was thinking along the same lines as you, listing the use > cases > > on the > > > > wiki, but didn't find time to get around doing that yet. > > Don't mind if you do it if you have access now. > > I was thinking it would be nice if, once we have the use > cases > > listed, > > > > people could use likes to up-vote the use cases similar to > what > > they're > > > > working on. > > > > I should have a bit more time to action this in the next > few days, > > but > > > > happy for you to do it if you can beat me to it ;-) > > > > Cheers, > > Michal > > > > On 07/04/17 04:39, Arun Mathew wrote: > > > > Sure, Thanks Matthias. My id is [arunmathew88]. > > > > > > > > Of course. I was thinking of a subpage where people can > > collaborate. > > > > > > > > > > Will do as per Michael’s suggestion. > > > > > > > > Regards, > > > > Arun Mathew > > > > > > > > On 4/7/17, 12:30, "Matthias J. Sax" <matth...@confluent.io> > <matth...@confluent.io> > < > > matth...@confluent.io> wrote: > > > > > > > > > > Please share your Wiki-ID and a committer can give you > write > > access. > > > > > > > > > > Btw: as you did not initiate the KIP, you should not > change the > > KIP > > > > > > without the permission of the original author -- in > this case > > Michael. > > > > > > > > > > So you might also just share your thought over the > mailing list > > and > > > > > > Michael can update the KIP page. Or, as an alternative, > just > > create a > > > > > > subpage for the KIP page. > > > > > > > > @Michael: WDYT? > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 4/6/17 8:05 PM, Arun Mathew wrote: > > > > > Hi Jay, > > > > > Thanks for the advise, I would like to list > down > > the use cases as > > > > > > > per your suggestion. But it seems I don't have write > > permission to the > > > > > > > Apache Kafka Confluent Space. Whom shall I request > for it? > > > > > > > > > > Regarding your last question. We are using a patch in > our > > production system > > > > > > > which does exactly this. > > > > > We window by the event time, but trigger punctuate in > > <punctuate interval> > > > > > > > duration of system time, in the absence of an event > crossing > > the punctuate > > > > > > > event time. > > > > > > > > > > We are using Kafka Stream for our Audit Trail, where > we need > > to output the > > > > > > > event counts on each topic on each cluster aggregated > over a > > 1 minute > > > > > > > window. We have to use event time to be able to cross > check > > the counts. But > > > > > > > we need to trigger punctuate [aggregate event pushes] > by > > system time in the > > > > > > > absence of events. Otherwise the event counts for > unexpired > > windows would > > > > > > > be 0 which is bad. > > > > > > > > > > "Maybe a hybrid solution works: I window by event > time but > > trigger results > > > > > > > by system time for windows that have updated? Not > really sure > > the details > > > > > > > of making that work. Does that work? Are there > concrete > > examples where you > > > > > > > actually want the current behavior?" > > > > > > > > > > -- > > > > > With Regards, > > > > > > > > > > Arun Mathew > > > > > Yahoo! JAPAN Corporation > > > > > > > > > > On Wed, Apr 5, 2017 at 8:48 PM, Tianji Li < > > skyah...@gmail.com><skyah...@gmail.com> <skyah...@gmail.com> wrote: > > > > > > > > > > > >> Hi Jay, > > > > >> > > > > >> The hybrid solution is exactly what I expect and > need for > > our use cases > > > > > > >> when dealing with telecom data. > > > > >> > > > > >> Thanks > > > > >> Tianji > > > > >> > > > > >> On Wed, Apr 5, 2017 at 12:01 AM, Jay Kreps < > > j...@confluent.io><j...@confluent.io> <j...@confluent.io> wrote: > > > > > > >> > > > > >>> Hey guys, > > > > >>> > > > > >>> One thing I've always found super important for > this kind > > of design work > > > > > > >> is > > > > >>> to do a really good job of cataloging the landscape > of use > > cases and how > > > > > > >>> prevalent each one is. By that I mean not just > listing lots > > of uses, but > > > > > > >>> also grouping them into categories that > functionally need > > the same thing. > > > > > > >>> In the absence of this it is very hard to reason > about > > design proposals. > > > > > > >>> From the proposals so far I think we have a lot of > > discussion around > > > > > > >>> possible apis, but less around what the user needs > for > > different use > > > > > > >> cases > > > > >>> and how they would implement that using the api. > > > > >>> > > > > >>> Here is an example: > > > > >>> You aggregate click and impression data for a > reddit like > > site. Every ten > > > > > > >>> minutes you want to output a ranked list of the top > 10 > > articles ranked by > > > > > > >>> clicks/impressions for each geographical area. I > want to be > > able run this > > > > > > >>> in steady state as well as rerun to regenerate > results (or > > catch up if it > > > > > > >>> crashes). > > > > >>> > > > > >>> There are a couple of tricky things that seem to > make this > > hard with > > > > > > >> either > > > > >>> of the options proposed: > > > > >>> 1. If I emit this data using event time I have the > problem > > described > > > > > > >> where > > > > >>> a geographical region with no new clicks or > impressions > > will fail to > > > > > > >> output > > > > >>> results. > > > > >>> 2. If I emit this data using system time I have the > problem > > that when > > > > > > >>> reprocessing data my window may not be ten minutes > but 10 > > hours if my > > > > > > >>> processing is very fast so it dramatically changes > the > > output. > > > > > > >>> > > > > >>> Maybe a hybrid solution works: I window by event > time but > > trigger results > > > > > > >>> by system time for windows that have updated? Not > really > > sure the details > > > > > > >>> of making that work. Does that work? Are there > concrete > > examples where > > > > > > >> you > > > > >>> actually want the current behavior? > > > > >>> > > > > >>> -Jay > > > > >>> > > > > >>> > > > > >>> On Tue, Apr 4, 2017 at 8:32 PM, Arun Mathew < > > arunmathe...@gmail.com> <arunmathe...@gmail.com> <arunmathe...@gmail.com> > > > > > > >>> wrote: > > > > >>> > > > > >>>> Hi All, > > > > >>>> > > > > >>>> Thanks for the KIP. We were also in need of a > mechanism to > > trigger > > > > > > >>>> punctuate in the absence of events. > > > > >>>> > > > > >>>> As I described in [ > > > > >>>> https://issues.apache.org/jira/browse/KAFKA-3514? > > > > >>>> focusedCommentId=15926036&page=com.atlassian.jira. > > > > >>>> plugin.system.issuetabpanels:comment- > tabpanel#comment- > > 15926036 > > > > > > >>>> ], > > > > >>>> > > > > >>>> - Our approached involved using the event time > by > > default. > > > > > > >>>> - The method to check if there is any punctuate > ready > > in the > > > > > > >>>> PunctuationQueue is triggered via the any event > > received by the > > > > > > >> stream > > > > >>>> tread, or at the polling intervals in the > absence of > > any events. > > > > > > >>>> - When we create Punctuate objects (which > contains the > > next event > > > > > > >> time > > > > >>>> for punctuation and interval), we also record > the > > creation time > > > > > > >>> (system > > > > >>>> time). > > > > >>>> - While checking for maturity of Punctuate > Schedule by > > > > >> mayBePunctuate > > > > >>>> method, we also check if the system clock has > elapsed > > the punctuate > > > > > > >>>> interval since the schedule creation time. > > > > >>>> - In the absence of any event, or in the > absence of any > > event for > > > > > > >> one > > > > >>>> topic in the partition group assigned to the > stream > > task, the system > > > > > > >>>> time > > > > >>>> will elapse the interval and we trigger a > punctuate > > using the > > > > > > >> expected > > > > >>>> punctuation event time. > > > > >>>> - we then create the next punctuation schedule > as > > punctuation event > > > > > > >>> time > > > > >>>> + punctuation interval, [again recording the > system > > time of creation > > > > > > >>> of > > > > >>>> the > > > > >>>> schedule]. > > > > >>>> > > > > >>>> We call this a Hybrid Punctuate. Of course, this > approach > > has pros and > > > > > > >>>> cons. > > > > >>>> Pros > > > > >>>> > > > > >>>> - Punctuates will happen in <punctuate > interval> time > > duration at > > > > > > >> max > > > > >>> in > > > > >>>> terms of system time. > > > > >>>> - The semantics as a whole continues to revolve > around > > event time. > > > > > > >>>> - We can use the old data [old timestamps] to > rerun any > > experiments > > > > > > >> or > > > > >>>> tests. > > > > >>>> > > > > >>>> Cons > > > > >>>> > > > > >>>> - In case the <punctuate interval> is not a > time > > duration [say > > > > > > >>> logical > > > > >>>> time/event count], then the approach might not > be > > meaningful. > > > > > > >>>> - In case there is a case where we have to wait > for an > > actual event > > > > > > >>> from > > > > >>>> a low event rate partition in the partition > group, this > > approach > > > > > > >> will > > > > >>>> jump > > > > >>>> the gun. > > > > >>>> - in case the event processing cannot catch up > with the > > event rate > > > > > > >> and > > > > >>>> the expected timestamp events gets queued for > long > > time, this > > > > > > >> approach > > > > >>>> might jump the gun. > > > > >>>> > > > > >>>> I believe the above approach and discussion goes > close to > > the approach > > > > > > >> A. > > > > >>>> > > > > >>>> ----------- > > > > >>>> > > > > >>>> I like the idea of having an even count based > punctuate. > > > > >>>> > > > > >>>> ----------- > > > > >>>> > > > > >>>> I agree with the discussion around approach C, > that we > > should provide > > > > > > >> the > > > > >>>> user with the option to choose system time or > event time > > based > > > > > > >>> punctuates. > > > > >>>> But I believe that the user predominantly wants to > use > > event time while > > > > > > >>> not > > > > >>>> missing out on regular punctuates due to event > delays or > > event > > > > > > >> absences. > > > > >>>> Hence a complex punctuate option as Matthias > mentioned > > (quoted below) > > > > > > >>> would > > > > >>>> be most apt. > > > > >>>> > > > > >>>> "- We might want to add "complex" schedules later > on > > (like, punctuate > > > > > > >> on > > > > >>>> every 10 seconds event-time or 60 seconds system- > time > > whatever comes > > > > > > >>>> first)." > > > > >>>> > > > > >>>> ----------- > > > > >>>> > > > > >>>> I think I read somewhere that Kafka Streams > started with > > System Time as > > > > > > >>> the > > > > >>>> punctuation standard, but was later changed to > Event Time. > > I guess > > > > > > >> there > > > > >>>> would be some good reason behind it. As Kafka > Streams want > > to evolve > > > > > > >> more > > > > >>>> on the Stream Processing front, I believe the > emphasis on > > event time > > > > > > >>> would > > > > >>>> remain quite strong. > > > > >>>> > > > > >>>> > > > > >>>> With Regards, > > > > >>>> > > > > >>>> Arun Mathew > > > > >>>> Yahoo! JAPAN Corporation, Tokyo > > > > >>>> > > > > >>>> > > > > >>>> On Wed, Apr 5, 2017 at 3:53 AM, Thomas Becker < > > tobec...@tivo.com> <tobec...@tivo.com> <tobec...@tivo.com> > > > > > > >> wrote: > > > > >>>> > > > > >>>>> Yeah I like PuncutationType much better; I just > threw > > Time out there > > > > > > >>>>> more as a strawman than an actual suggestion ;) I > still > > think it's > > > > > > >>>>> worth considering what this buys us over an > additional > > callback. I > > > > > > >>>>> foresee a number of punctuate implementations > following > > this pattern: > > > > > > >>>>> > > > > >>>>> public void punctuate(PunctuationType type) { > > > > >>>>> switch (type) { > > > > >>>>> case EVENT_TIME: > > > > >>>>> methodA(); > > > > >>>>> break; > > > > >>>>> case SYSTEM_TIME: > > > > >>>>> methodB(); > > > > >>>>> break; > > > > >>>>> } > > > > >>>>> } > > > > >>>>> > > > > >>>>> I guess one advantage of this approach is we > could add > > additional > > > > > > >>>>> punctuation types later in a backwards compatible > way > > (like event > > > > > > >> count > > > > >>>>> as you mentioned). > > > > >>>>> > > > > >>>>> -Tommy > > > > >>>>> > > > > >>>>> > > > > >>>>> On Tue, 2017-04-04 at 11:10 -0700, Matthias J. > Sax wrote: > > > > >>>>>> That sounds promising. > > > > >>>>>> > > > > >>>>>> I am just wondering if `Time` is the best name. > Maybe we > > want to > > > > > > >> add > > > > >>>>>> other non-time based punctuations at some point > later. I > > would > > > > > > >>>>>> suggest > > > > >>>>>> > > > > >>>>>> enum PunctuationType { > > > > >>>>>> EVENT_TIME, > > > > >>>>>> SYSTEM_TIME, > > > > >>>>>> } > > > > >>>>>> > > > > >>>>>> or similar. Just to keep the door open -- it's > easier to > > add new > > > > > > >>>>>> stuff > > > > >>>>>> if the name is more generic. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> -Matthias > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> On 4/4/17 5:30 AM, Thomas Becker wrote: > > > > >>>>>>> > > > > >>>>>>> I agree that the framework providing and > managing the > > notion of > > > > > > >>>>>>> stream > > > > >>>>>>> time is valuable and not something we would > want to > > delegate to > > > > > > >> the > > > > >>>>>>> tasks. I'm not entirely convinced that a > separate > > callback > > > > > > >> (option > > > > >>>>>>> C) > > > > >>>>>>> is that messy (it could just be a default > method with > > an empty > > > > > > >>>>>>> implementation), but if we wanted a single API > to > > handle both > > > > > > >>>>>>> cases, > > > > >>>>>>> how about something like the following? > > > > >>>>>>> > > > > >>>>>>> enum Time { > > > > >>>>>>> STREAM, > > > > >>>>>>> CLOCK > > > > >>>>>>> } > > > > >>>>>>> > > > > >>>>>>> Then on ProcessorContext: > > > > >>>>>>> context.schedule(Time time, long interval) // > We could > > allow > > > > > > >> this > > > > >>>>>>> to > > > > >>>>>>> be called once for each value of time to mix > > approaches. > > > > > > >>>>>>> > > > > >>>>>>> Then the Processor API becomes: > > > > >>>>>>> punctuate(Time time) // time here denotes which > > schedule resulted > > > > > > >>>>>>> in > > > > >>>>>>> this call. > > > > >>>>>>> > > > > >>>>>>> Thoughts? > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> On Mon, 2017-04-03 at 22:44 -0700, Matthias J. > Sax > > wrote: > > > > > > >>>>>>>> > > > > >>>>>>>> Thanks a lot for the KIP Michal, > > > > >>>>>>>> > > > > >>>>>>>> I was thinking about the four options you > proposed in > > more > > > > > > >>>>>>>> details > > > > >>>>>>>> and > > > > >>>>>>>> this are my thoughts: > > > > >>>>>>>> > > > > >>>>>>>> (A) You argue, that users can still > "punctuate" on > > event-time > > > > > > >> via > > > > >>>>>>>> process(), but I am not sure if this is > possible. > > Note, that > > > > > > >>>>>>>> users > > > > >>>>>>>> only > > > > >>>>>>>> get record timestamps via context.timestamp(). > Thus, > > users > > > > > > >> would > > > > >>>>>>>> need > > > > >>>>>>>> to > > > > >>>>>>>> track the time progress per partition (based > on the > > partitions > > > > > > >>>>>>>> they > > > > >>>>>>>> obverse via context.partition(). (This alone > puts a > > huge burden > > > > > > >>>>>>>> on > > > > >>>>>>>> the > > > > >>>>>>>> user by itself.) However, users are not > notified at > > startup > > > > > > >> what > > > > >>>>>>>> partitions are assigned, and user are not > notified > > when > > > > > > >>>>>>>> partitions > > > > >>>>>>>> get > > > > >>>>>>>> revoked. Because this information is not > available, > > it's not > > > > > > >>>>>>>> possible > > > > >>>>>>>> to > > > > >>>>>>>> "manually advance" stream-time, and thus > event-time > > punctuation > > > > > > >>>>>>>> within > > > > >>>>>>>> process() seems not to be possible -- or do > you see a > > way to > > > > > > >> get > > > > >>>>>>>> it > > > > >>>>>>>> done? And even if, it might still be too > clumsy to > > use. > > > > > > >>>>>>>> > > > > >>>>>>>> (B) This does not allow to mix both > approaches, thus > > limiting > > > > > > >>>>>>>> what > > > > >>>>>>>> users > > > > >>>>>>>> can do. > > > > >>>>>>>> > > > > >>>>>>>> (C) This should give all flexibility we need. > However, > > just > > > > > > >>>>>>>> adding > > > > >>>>>>>> one > > > > >>>>>>>> more method seems to be a solution that is too > simple > > (cf my > > > > > > >>>>>>>> comments > > > > >>>>>>>> below). > > > > >>>>>>>> > > > > >>>>>>>> (D) This might be hard to use. Also, I am not > sure how > > a user > > > > > > >>>>>>>> could > > > > >>>>>>>> enable system-time and event-time punctuation > in > > parallel. > > > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> Overall options (C) seems to be the most > promising > > approach to > > > > > > >>>>>>>> me. > > > > >>>>>>>> Because I also favor a clean API, we might > keep > > current > > > > > > >>>>>>>> punctuate() > > > > >>>>>>>> as-is, but deprecate it -- so we can remove it > at some > > later > > > > > > >>>>>>>> point > > > > >>>>>>>> when > > > > >>>>>>>> people use the "new punctuate API". > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> Couple of follow up questions: > > > > >>>>>>>> > > > > >>>>>>>> - I am wondering, if we should have two > callback > > methods or > > > > > > >> just > > > > >>>>>>>> one > > > > >>>>>>>> (ie, a unified for system and event time > punctuation > > or one for > > > > > > >>>>>>>> each?). > > > > >>>>>>>> > > > > >>>>>>>> - If we have one, how can the user figure out, > which > > condition > > > > > > >>>>>>>> did > > > > >>>>>>>> trigger? > > > > >>>>>>>> > > > > >>>>>>>> - How would the API look like, for registering > > different > > > > > > >>>>>>>> punctuate > > > > >>>>>>>> schedules? The "type" must be somehow defined? > > > > >>>>>>>> > > > > >>>>>>>> - We might want to add "complex" schedules > later on > > (like, > > > > > > >>>>>>>> punctuate > > > > >>>>>>>> on > > > > >>>>>>>> every 10 seconds event-time or 60 seconds > system-time > > whatever > > > > > > >>>>>>>> comes > > > > >>>>>>>> first). I don't say we should add this right > away, but > > we might > > > > > > >>>>>>>> want > > > > >>>>>>>> to > > > > >>>>>>>> define the API in a way, that it allows > extensions > > like this > > > > > > >>>>>>>> later > > > > >>>>>>>> on, > > > > >>>>>>>> without redesigning the API (ie, the API > should be > > designed > > > > > > >>>>>>>> extensible) > > > > >>>>>>>> > > > > >>>>>>>> - Did you ever consider count-based > punctuation? > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> I understand, that you would like to solve a > simple > > problem, > > > > > > >> but > > > > >>>>>>>> we > > > > >>>>>>>> learned from the past, that just "adding some > API" > > quickly > > > > > > >> leads > > > > >>>>>>>> to a > > > > >>>>>>>> not very well defined API that needs time > consuming > > clean up > > > > > > >>>>>>>> later on > > > > >>>>>>>> via other KIPs. Thus, I would prefer to get a > holistic > > > > >>>>>>>> punctuation > > > > >>>>>>>> KIP > > > > >>>>>>>> with this from the beginning on to avoid later > painful > > > > >> redesign. > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> -Matthias > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> On 4/3/17 11:58 AM, Michal Borowiecki wrote: > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> Thanks Thomas, > > > > >>>>>>>>> > > > > >>>>>>>>> I'm also wary of changing the existing > semantics of > > > > >> punctuate, > > > > >>>>>>>>> for > > > > >>>>>>>>> backward compatibility reasons, although I > like the > > > > >> conceptual > > > > >>>>>>>>> simplicity of that option. > > > > >>>>>>>>> > > > > >>>>>>>>> Adding a new method to me feels safer but, in > a way, > > uglier. > > > > > > >> I > > > > >>>>>>>>> added > > > > >>>>>>>>> this to the KIP now as option (C). > > > > >>>>>>>>> > > > > >>>>>>>>> The TimestampExtractor mechanism is actually > more > > flexible, > > > > > > >> as > > > > >>>>>>>>> it > > > > >>>>>>>>> allows > > > > >>>>>>>>> you to return any value, you're not limited > to event > > time or > > > > > > >>>>>>>>> system > > > > >>>>>>>>> time > > > > >>>>>>>>> (although I don't see an actual use case > where you > > might need > > > > > > >>>>>>>>> anything > > > > >>>>>>>>> else then those two). Hence I also proposed > the > > option to > > > > > > >> allow > > > > >>>>>>>>> users > > > > >>>>>>>>> to, effectively, decide what "stream time" is > for > > them given > > > > > > >>>>>>>>> the > > > > >>>>>>>>> presence or absence of messages, much like > they can > > decide > > > > > > >> what > > > > >>>>>>>>> msg > > > > >>>>>>>>> time > > > > >>>>>>>>> means for them using the TimestampExtractor. > What do > > you > > > > > > >> think > > > > >>>>>>>>> about > > > > >>>>>>>>> that? This is probably most flexible but also > most > > > > >> complicated. > > > > >>>>>>>>> > > > > >>>>>>>>> All comments appreciated. > > > > >>>>>>>>> > > > > >>>>>>>>> Cheers, > > > > >>>>>>>>> > > > > >>>>>>>>> Michal > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> On 03/04/17 19:23, Thomas Becker wrote: > > > > >>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>>> Although I fully agree we need a way to > trigger > > periodic > > > > > > >>>>>>>>>> processing > > > > >>>>>>>>>> that is independent from whether and when > messages > > arrive, > > > > > > >>>>>>>>>> I'm > > > > >>>>>>>>>> not sure > > > > >>>>>>>>>> I like the idea of changing the existing > semantics > > across > > > > > > >> the > > > > >>>>>>>>>> board. > > > > >>>>>>>>>> What if we added an additional callback to > Processor > > that > > > > > > >> can > > > > >>>>>>>>>> be > > > > >>>>>>>>>> scheduled similarly to punctuate() but was > always > > called at > > > > > > >>>>>>>>>> fixed, wall > > > > >>>>>>>>>> clock based intervals? This way you wouldn't > have to > > give > > > > > > >> up > > > > >>>>>>>>>> the > > > > >>>>>>>>>> notion > > > > >>>>>>>>>> of stream time to be able to do periodic > processing. > > > > >>>>>>>>>> > > > > >>>>>>>>>> On Mon, 2017-04-03 at 10:34 +0100, Michal > Borowiecki > > wrote: > > > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Hi all, > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> I have created a draft for KIP-138: Change > > punctuate > > > > > > >>>>>>>>>>> semantics > > > > >>>>>>>>>>> <https://cwiki.apache.org/ > > confluence/display/KAFKA/KIP- <https://cwiki.apache.org/ > confluence/display/KAFKA/KIP-> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-> > > > > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KI > P-> <https://cwiki.apache.org/confluence/display/KAFKA/KIP->>> > > 138% > > > > > > >>>>>>>>>>> 3A+C > > > > >>>>>>>>>>> hange+ > > > > >>>>>>>>>>> punctuate+semantics> > > > > >>>>>>>>>>> . > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Appreciating there can be different views > on > > system-time > > > > > > >> vs > > > > >>>>>>>>>>> event- > > > > >>>>>>>>>>> time > > > > >>>>>>>>>>> semantics for punctuation depending on use- > case and > > the > > > > > > >>>>>>>>>>> importance of > > > > >>>>>>>>>>> backwards compatibility of any such change, > I've > > left it > > > > > > >>>>>>>>>>> quite > > > > >>>>>>>>>>> open > > > > >>>>>>>>>>> and > > > > >>>>>>>>>>> hope to fill in more info as the discussion > > progresses. > > > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> Thanks, > > > > >>>>>>>>>>> Michal > > > > >>>>>>> -- > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Tommy Becker > > > > >>>>>>> > > > > >>>>>>> Senior Software Engineer > > > > >>>>>>> > > > > >>>>>>> O +1 919.460.4747 <(919)%20460-4747> <(919)%20460-4747> > > > > >>>>>>> > > > > >>>>>>> tivo.com > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> ________________________________ > > > > >>>>>>> > > > > >>>>>>> This email and any attachments may contain > confidential > > and > > > > > > >>>>>>> privileged material for the sole use of the > intended > > recipient. > > > > > > >> Any > > > > >>>>>>> review, copying, or distribution of this email > (or any > > > > >> attachments) > > > > >>>>>>> by others is prohibited. If you are not the > intended > > recipient, > > > > > > >>>>>>> please contact the sender immediately and > permanently > > delete this > > > > > > >>>>>>> email and any attachments. No employee or agent > of TiVo > > Inc. is > > > > > > >>>>>>> authorized to conclude any binding agreement on > behalf > > of TiVo > > > > > > >> Inc. > > > > >>>>>>> by email. Binding agreements with TiVo Inc. may > only be > > made by a > > > > > > >>>>>>> signed written agreement. > > > > >>>>>>> > > > > >>>>> -- > > > > >>>>> > > > > >>>>> > > > > >>>>> Tommy Becker > > > > >>>>> > > > > >>>>> Senior Software Engineer > > > > >>>>> > > > > >>>>> O +1 919.460.4747 <(919)%20460-4747> <(919)%20460-4747> > > > > >>>>> > > > > >>>>> tivo.com > > > > >>>>> > > > > >>>>> > > > > >>>>> ________________________________ > > > > >>>>> > > > > >>>>> This email and any attachments may contain > confidential > > and > > > > > > >> privileged > > > > >>>>> material for the sole use of the intended > recipient. Any > > review, > > > > > > >>> copying, > > > > >>>>> or distribution of this email (or any > attachments) by > > others is > > > > > > >>>> prohibited. > > > > >>>>> If you are not the intended recipient, please > contact the > > sender > > > > > > >>>>> immediately and permanently delete this email and > any > > attachments. No > > > > > > >>>>> employee or agent of TiVo Inc. is authorized to > conclude > > any binding > > > > > > >>>>> agreement on behalf of TiVo Inc. by email. > Binding > > agreements with > > > > > > >> TiVo > > > > >>>>> Inc. may only be made by a signed written > agreement. > > > > >>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > <http://www.openbet.com/> <http://www.openbet.com/> > > > > > > *Michal Borowiecki* > > > > *Senior Software Engineer L4* > > > > *T: * > > > > +44 208 742 1600 <+44%2020%208742%201600> <+44%2020%208742%201600> > > > > +44 203 249 8448 <+44%2020%203249%208448> <+44%2020%203249%208448> > > > > > > > > *E: * > > > > michal.borowie...@openbet.com > > > > *W: * > > > > www.openbet.com > > > > *OpenBet Ltd* > > > > Chiswick Park Building 9 > > > > 566 Chiswick High Rd > > > > London > > > > W4 5XT > > > > UK > > > > <https://www.openbet.com/email_promo> > <https://www.openbet.com/email_promo> > > > > > > > > This message is confidential and intended only for the > addressee. > > If you > > > > have received this message in error, please immediately > notify the > > postmas...@openbet.com and delete it from your system as > well as > > any > > > > copies. The content of e-mails as well as traffic data may > be > > monitored by > > > > OpenBet for employment and security purposes. To protect > the > > environment > > > > please do not print this e-mail unless necessary. OpenBet > Ltd. > > Registered > > > > Office: Chiswick Park Building 9, 566 Chiswick High Road, > London, > > W4 5XT, > > > > United Kingdom. A company registered in England and Wales. > > Registered no. > > > > 3134634. VAT no. GB927523612 > > > > > > > > > > -- > > > Tommy Becker > > Senior Software Engineer > > O +1 919.460.4747 <(919)%20460-4747> > > > > tivo.com > > > > > ________________________________ > > This email and any attachments may contain confidential and privileged > material for the sole use of the intended recipient. Any review, copying, or > distribution of this email (or any attachments) by others is prohibited. If > you are not the intended recipient, please contact the sender immediately and > permanently delete this email and any attachments. No employee or agent of > TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo > Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed > written agreement. > > > -- > <http://www.openbet.com/> Michal Borowiecki > Senior Software Engineer L4 > T: +44 208 742 1600 <+44%2020%208742%201600> > > > +44 203 249 8448 <+44%2020%203249%208448> > > > > E: michal.borowie...@openbet.com > W: www.openbet.com > OpenBet Ltd > > Chiswick Park Building 9 > > 566 Chiswick High Rd > > London > > W4 5XT > > UK > <https://www.openbet.com/email_promo> > This message is confidential and intended only for the addressee. If you > have received this message in error, please immediately notify the > postmas...@openbet.com and delete it from your system as well as any > copies. The content of e-mails as well as traffic data may be monitored by > OpenBet for employment and security purposes. To protect the environment > please do not print this e-mail unless necessary. OpenBet Ltd. Registered > Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, > United Kingdom. A company registered in England and Wales. Registered no. > 3134634. VAT no. GB927523612 >