Hi, thanks for updating the KIP. Looks good to me overall.
I think adding `Cancellable` (or should it be `Cancelable` to follow American English?) is a clean solution, in contrast to the proposed alternative. One minor comment: can you add `ValueTransformer#punctuate()` to the list of deprecated methods? -Matthias On 5/4/17 1:41 AM, Michal Borowiecki wrote: > Further in this direction I've updated the main proposal to incorporate > the Cancellable return type for ProcessorContext.schedule and the > guidance on how to implement "hybrid" punctuation with the proposed 2 > PunctuationTypes. > > I look forward to more comments whether the Cancallable return type is > an agreeable solution and it's precise definition. > > I shall move all alternatives other than the main proposal into the > Rejected Alternatives section and if I hear any objections, I'll move > those back up and we'll discuss further. > > > Looking forward to all comments and suggestions. > > > Thanks, > > Michal > > > On 01/05/17 18:23, Michal Borowiecki wrote: >> >> Hi all, >> >> As promised, here is my take at how one could implement the previously >> discussed hybrid semantics using the 2 PunctuationType callbacks (one >> for STREAM_TIME and one for SYSTEM_TIME). >> >> However, there's a twist. >> >> Since currently calling context.schedule() adds a new >> PunctuationSchedule and does not overwrite the previous one, a slight >> change would be required: >> >> a) either that PuncuationSchedules are cancellable >> >> b) or that calling schedule() ||overwrites(cancels) the previous one >> with the given |PunctuationType |(but that's not how it works currently) >> >> >> Below is an example assuming approach a) is implemented by having >> schedule return Cancellable instead of void. >> >> |ProcessorContext context;| >> |long| |streamTimeInterval = ...;| >> |long| |systemTimeUpperBound = ...; ||//e.g. systemTimeUpperBound = >> streamTimeInterval + some tolerance| >> |Cancellable streamTimeSchedule;| >> |Cancellable systemTimeSchedule;| >> |long| |lastStreamTimePunctation = -||1||;| >> | | >> |public| |void| |init(ProcessorContext context){| >> | ||this||.context = context;| >> | ||streamTimeSchedule = >> context.schedule(PunctuationType.STREAM_TIME, >> streamTimeInterval, ||this||::streamTimePunctuate);| >> | ||systemTimeSchedule = >> context.schedule(PunctuationType.SYSTEM_TIME, >> systemTimeUpperBound, ||this||::systemTimePunctuate); | >> |}| >> | | >> |public| |void| |streamTimePunctuate(||long| |streamTime){| >> | ||periodicBusiness(streamTime);| >> >> | ||systemTimeSchedule.cancel();| >> | ||systemTimeSchedule = >> context.schedule(PunctuationType.SYSTEM_TIME, >> systemTimeUpperBound, ||this||::systemTimePunctuate);| >> |}| >> | | >> |public| |void| |systemTimePunctuate(||long| |systemTime){| >> | ||periodicBusiness(context.timestamp());| >> >> | ||streamTimeSchedule.cancel();| >> | ||streamTimeSchedule = >> context.schedule(PunctuationType.STREAM_TIME, >> streamTimeInterval, ||this||::streamTimePunctuate);| >> |}| >> | | >> |public| |void| |periodicBusiness(||long| |streamTime){| >> | ||// guard against streamTime == -1, easy enough.| >> | ||// if you need system time instead, just use >> System.currentTimeMillis()| >> | | >> | ||// do something businessy here| >> |}| >> >> Where Cancellable is either an interface containing just a single void >> cancel() method or also boolean isCancelled() like here >> <http://doc.akka.io/japi/akka/2.5.0/akka/actor/Cancellable.html>. >> >> >> Please let your opinions known whether we should proceed in this >> direction or leave "hybrid" considerations out of scope. >> >> Looking forward to hearing your thoughts. >> >> Thanks, >> Michal >> >> On 30/04/17 20:07, Michal Borowiecki wrote: >>> >>> Hi Matthias, >>> >>> I'd like to start moving the discarded ideas into Rejected >>> Alternatives section. Before I do, I want to tidy them up, ensure >>> they've each been given proper treatment. >>> >>> To that end let me go back to one of your earlier comments about the >>> original suggestion (A) to put that to bed. >>> >>> >>> On 04/04/17 06:44, Matthias J. Sax wrote: >>>> (A) You argue, that users can still "punctuate" on event-time via >>>> process(), but I am not sure if this is possible. Note, that users only >>>> get record timestamps via context.timestamp(). Thus, users would need to >>>> track the time progress per partition (based on the partitions they >>>> obverse via context.partition(). (This alone puts a huge burden on the >>>> user by itself.) However, users are not notified at startup what >>>> partitions are assigned, and user are not notified when partitions get >>>> revoked. Because this information is not available, it's not possible to >>>> "manually advance" stream-time, and thus event-time punctuation within >>>> process() seems not to be possible -- or do you see a way to get it >>>> done? And even if, it might still be too clumsy to use. >>> I might have missed something but I'm guessing your worry about users >>> having to track time progress /per partition/ comes from the what the >>> stream-time does currently. >>> But I'm not sure that those semantics of stream-time are ideal for >>> users of punctuate. >>> That is, if stream-time punctuate didn't exist and users had to use >>> process(), would they actually want to use the current semantics of >>> stream time? >>> >>> As a reminder stream time, in all its glory, is (not exactly >>> actually, but when trying to be absolutely precise here I spotted >>> KAFKA-5144 <https://issues.apache.org/jira/browse/KAFKA-5144> so I >>> think this approximation suffices to illustrate the point): >>> >>> a minimum across all input partitions of ( >>> if(msgs never received by partition) -1; >>> else { >>> a non-descending-minimum of ( the per-batch minimum msg timestamp) >>> } >>> ) >>> >>> Would that really be clear enough to the users of punctuate? Do they >>> care for such a convoluted notion of time? I see how this can be >>> useful for StreamTask to pick the next partition to take a record >>> from but for punctuate? >>> If users had to implement punctuation with process(), is that what >>> they would have chosen as their notion of time? >>> I'd argue not. >>> >>> None of the processors implementing the rich windowing/join >>> operations in the DSL use punctuate. >>> Let's take the KStreamKStreamJoinProcessor as an example, in it's >>> process() method it simply uses context().timestamp(), which, since >>> it's called from process, returns simply, per javadoc: >>> If it is triggered while processing a record streamed from the source >>> processor, timestamp is defined as the timestamp of the current input >>> record; >>> So they don't use that convoluted formula for stream-time. Instead, >>> they only care about the timestamp of the current record. I think >>> that having users track just that wouldn't be that much of a burden. >>> I don't think they need to care about which partitions got assigned >>> or not. And StreamTask would still be picking records first from the >>> partition having the lowest timestamp to try to "synchronize" the >>> streams as it does now. >>> >>> What users would have to do in their Processor implementations is >>> somewhere along the lines of: >>> >>> long lastPunctuationTime = 0; >>> long interval = <some-number>; //millis >>> >>> @Override >>> public void process(K key, V value){ >>> while (ctx.timestamp() >= lastPunctuationTime + interval){ >>> punctuate(ctx.timestamp()); >>> lastPunctuationTime += interval;// I'm not sure of the merit >>> of this vs lastPunctuationTime = ctx.timestamp(); but that's what >>> PunctuationQueue does currently >>> } >>> // do some other business logic here >>> } >>> >>> Looking forward to your thoughts. >>> >>> Cheers, >>> Michal >>> >>> -- >>> Signature >>> <http://www.openbet.com/> Michal Borowiecki >>> Senior Software Engineer L4 >>> T: +44 208 742 1600 >>> >>> >>> +44 203 249 8448 >>> >>> >>> >>> E: michal.borowie...@openbet.com >>> W: www.openbet.com <http://www.openbet.com/> >>> >>> >>> OpenBet Ltd >>> >>> Chiswick Park Building 9 >>> >>> 566 Chiswick High Rd >>> >>> London >>> >>> W4 5XT >>> >>> UK >>> >>> >>> <https://www.openbet.com/email_promo> >>> >>> This message is confidential and intended only for the addressee. If >>> you have received this message in error, please immediately notify >>> the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete >>> it from your system as well as any copies. The content of e-mails as >>> well as traffic data may be monitored by OpenBet for employment and >>> security purposes. To protect the environment please do not print >>> this e-mail unless necessary. OpenBet Ltd. Registered Office: >>> Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, >>> United Kingdom. A company registered in England and Wales. Registered >>> no. 3134634. VAT no. GB927523612 >>> >> >> -- >> Signature >> <http://www.openbet.com/> Michal Borowiecki >> Senior Software Engineer L4 >> T: +44 208 742 1600 >> >> >> +44 203 249 8448 >> >> >> >> E: michal.borowie...@openbet.com >> W: www.openbet.com <http://www.openbet.com/> >> >> >> OpenBet Ltd >> >> Chiswick Park Building 9 >> >> 566 Chiswick High Rd >> >> London >> >> W4 5XT >> >> UK >> >> >> <https://www.openbet.com/email_promo> >> >> This message is confidential and intended only for the addressee. If >> you have received this message in error, please immediately notify the >> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it >> from your system as well as any copies. The content of e-mails as well >> as traffic data may be monitored by OpenBet for employment and >> security purposes. To protect the environment please do not print this >> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park >> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A >> company registered in England and Wales. Registered no. 3134634. VAT >> no. GB927523612 >> > > -- > Signature > <http://www.openbet.com/> Michal Borowiecki > Senior Software Engineer L4 > T: +44 208 742 1600 > > > +44 203 249 8448 > > > > E: michal.borowie...@openbet.com > W: www.openbet.com <http://www.openbet.com/> > > > OpenBet Ltd > > Chiswick Park Building 9 > > 566 Chiswick High Rd > > London > > W4 5XT > > UK > > > <https://www.openbet.com/email_promo> > > This message is confidential and intended only for the addressee. If you > have received this message in error, please immediately notify the > postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it > from your system as well as any copies. The content of e-mails as well > as traffic data may be monitored by OpenBet for employment and security > purposes. To protect the environment please do not print this e-mail > unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building > 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company > registered in England and Wales. Registered no. 3134634. VAT no. > GB927523612 >
signature.asc
Description: OpenPGP digital signature