Hi,

thanks for updating the KIP. Looks good to me overall.

I think adding `Cancellable` (or should it be `Cancelable` to follow
American English?) is a clean solution, in contrast to the proposed
alternative.

One minor comment: can you add `ValueTransformer#punctuate()` to the
list of deprecated methods?


-Matthias



On 5/4/17 1:41 AM, Michal Borowiecki wrote:
> Further in this direction I've updated the main proposal to incorporate
> the Cancellable return type for ProcessorContext.schedule and the
> guidance on how to implement "hybrid" punctuation with the proposed 2
> PunctuationTypes.
> 
> I look forward to more comments whether the Cancallable return type is
> an agreeable solution and it's precise definition.
> 
> I shall move all alternatives other than the main proposal into the
> Rejected Alternatives section and if I hear any objections, I'll move
> those back up and we'll discuss further.
> 
> 
> Looking forward to all comments and suggestions.
> 
> 
> Thanks,
> 
> Michal
> 
> 
> On 01/05/17 18:23, Michal Borowiecki wrote:
>>
>> Hi all,
>>
>> As promised, here is my take at how one could implement the previously
>> discussed hybrid semantics using the 2 PunctuationType callbacks (one
>> for STREAM_TIME and one for SYSTEM_TIME).
>>
>> However, there's a twist.
>>
>> Since currently calling context.schedule() adds a new
>> PunctuationSchedule and does not overwrite the previous one, a slight
>> change would be required:
>>
>> a) either that PuncuationSchedules are cancellable
>>
>> b) or that calling schedule() ||overwrites(cancels) the previous one
>> with the given |PunctuationType |(but that's not how it works currently)
>>
>>
>> Below is an example assuming approach a) is implemented by having
>> schedule return Cancellable instead of void.
>>
>> |ProcessorContext context;|
>> |long| |streamTimeInterval = ...;|
>> |long| |systemTimeUpperBound = ...; ||//e.g. systemTimeUpperBound =
>> streamTimeInterval + some tolerance|
>> |Cancellable streamTimeSchedule;|
>> |Cancellable systemTimeSchedule;|
>> |long| |lastStreamTimePunctation = -||1||;|
>> | |
>> |public| |void| |init(ProcessorContext context){|
>> |    ||this||.context = context;|
>> |    ||streamTimeSchedule =
>> context.schedule(PunctuationType.STREAM_TIME,
>> streamTimeInterval,   ||this||::streamTimePunctuate);|
>> |    ||systemTimeSchedule =
>> context.schedule(PunctuationType.SYSTEM_TIME,
>> systemTimeUpperBound, ||this||::systemTimePunctuate);   |
>> |}|
>> | |
>> |public| |void| |streamTimePunctuate(||long| |streamTime){|
>> |    ||periodicBusiness(streamTime);|
>>  
>> |    ||systemTimeSchedule.cancel();|
>> |    ||systemTimeSchedule =
>> context.schedule(PunctuationType.SYSTEM_TIME,
>> systemTimeUpperBound, ||this||::systemTimePunctuate);|
>> |}|
>> | |
>> |public| |void| |systemTimePunctuate(||long| |systemTime){|
>> |    ||periodicBusiness(context.timestamp());|
>>  
>> |    ||streamTimeSchedule.cancel();|
>> |    ||streamTimeSchedule =
>> context.schedule(PunctuationType.STREAM_TIME,
>> streamTimeInterval, ||this||::streamTimePunctuate);|
>> |}|
>> | |
>> |public| |void| |periodicBusiness(||long| |streamTime){|
>> |    ||// guard against streamTime == -1, easy enough.|
>> |    ||// if you need system time instead, just use
>> System.currentTimeMillis()|
>> | |
>> |    ||// do something businessy here|
>> |}|
>>
>> Where Cancellable is either an interface containing just a single void
>> cancel() method or also boolean isCancelled() like here
>> <http://doc.akka.io/japi/akka/2.5.0/akka/actor/Cancellable.html>.
>>
>>
>> Please let your opinions known whether we should proceed in this
>> direction or leave "hybrid" considerations out of scope.
>>
>> Looking forward to hearing your thoughts.
>>
>> Thanks,
>> Michal
>>
>> On 30/04/17 20:07, Michal Borowiecki wrote:
>>>
>>> Hi Matthias,
>>>
>>> I'd like to start moving the discarded ideas into Rejected
>>> Alternatives section. Before I do, I want to tidy them up, ensure
>>> they've each been given proper treatment.
>>>
>>> To that end let me go back to one of your earlier comments about the
>>> original suggestion (A) to put that to bed.
>>>
>>>
>>> On 04/04/17 06:44, Matthias J. Sax wrote:
>>>> (A) You argue, that users can still "punctuate" on event-time via
>>>> process(), but I am not sure if this is possible. Note, that users only
>>>> get record timestamps via context.timestamp(). Thus, users would need to
>>>> track the time progress per partition (based on the partitions they
>>>> obverse via context.partition(). (This alone puts a huge burden on the
>>>> user by itself.) However, users are not notified at startup what
>>>> partitions are assigned, and user are not notified when partitions get
>>>> revoked. Because this information is not available, it's not possible to
>>>> "manually advance" stream-time, and thus event-time punctuation within
>>>> process() seems not to be possible -- or do you see a way to get it
>>>> done? And even if, it might still be too clumsy to use.
>>> I might have missed something but I'm guessing your worry about users
>>> having to track time progress /per partition/ comes from the what the
>>> stream-time does currently.
>>> But I'm not sure that those semantics of stream-time are ideal for
>>> users of punctuate.
>>> That is, if stream-time punctuate didn't exist and users had to use
>>> process(), would they actually want to use the current semantics of
>>> stream time?
>>>
>>> As a reminder stream time, in all its glory, is (not exactly
>>> actually, but when trying to be absolutely precise here I spotted
>>> KAFKA-5144 <https://issues.apache.org/jira/browse/KAFKA-5144> so I
>>> think this approximation suffices to illustrate the point):
>>>
>>> a minimum across all input partitions of (
>>>    if(msgs never received by partition) -1;
>>>    else {
>>>       a non-descending-minimum of ( the per-batch minimum msg timestamp)
>>>    }
>>> )
>>>
>>> Would that really be clear enough to the users of punctuate? Do they
>>> care for such a convoluted notion of time? I see how this can be
>>> useful for StreamTask to pick the next partition to take a record
>>> from but for punctuate?
>>> If users had to implement punctuation with process(), is that what
>>> they would have chosen as their notion of time?
>>> I'd argue not.
>>>
>>> None of the processors implementing the rich windowing/join
>>> operations in the DSL use punctuate.
>>> Let's take the KStreamKStreamJoinProcessor as an example, in it's
>>> process() method it simply uses context().timestamp(), which, since
>>> it's called from process, returns simply, per javadoc:
>>> If it is triggered while processing a record streamed from the source
>>> processor, timestamp is defined as the timestamp of the current input
>>> record;
>>> So they don't use that convoluted formula for stream-time. Instead,
>>> they only care about the timestamp of the current record. I think
>>> that having users track just that wouldn't be that much of a burden.
>>> I don't think they need to care about which partitions got assigned
>>> or not. And StreamTask would still be picking records first from the
>>> partition having the lowest timestamp to try to "synchronize" the
>>> streams as it does now.
>>>
>>> What users would have to do in their Processor implementations is
>>> somewhere along the lines of:
>>>
>>> long lastPunctuationTime = 0;
>>> long interval = <some-number>; //millis
>>>
>>> @Override
>>> public void process(K key, V value){
>>>     while (ctx.timestamp() >= lastPunctuationTime + interval){
>>>         punctuate(ctx.timestamp());
>>>         lastPunctuationTime += interval;// I'm not sure of the merit
>>> of this vs lastPunctuationTime = ctx.timestamp(); but that's what
>>> PunctuationQueue does currently
>>>     }
>>>     // do some other business logic here
>>> }
>>>
>>> Looking forward to your thoughts.
>>>
>>> Cheers,
>>> Michal
>>>
>>> -- 
>>> Signature
>>> <http://www.openbet.com/>   Michal Borowiecki
>>> Senior Software Engineer L4
>>>     T:      +44 208 742 1600
>>>
>>>     
>>>     +44 203 249 8448
>>>
>>>     
>>>      
>>>     E:      michal.borowie...@openbet.com
>>>     W:      www.openbet.com <http://www.openbet.com/>
>>>
>>>     
>>>     OpenBet Ltd
>>>
>>>     Chiswick Park Building 9
>>>
>>>     566 Chiswick High Rd
>>>
>>>     London
>>>
>>>     W4 5XT
>>>
>>>     UK
>>>
>>>     
>>> <https://www.openbet.com/email_promo>
>>>
>>> This message is confidential and intended only for the addressee. If
>>> you have received this message in error, please immediately notify
>>> the postmas...@openbet.com <mailto:postmas...@openbet.com> and delete
>>> it from your system as well as any copies. The content of e-mails as
>>> well as traffic data may be monitored by OpenBet for employment and
>>> security purposes. To protect the environment please do not print
>>> this e-mail unless necessary. OpenBet Ltd. Registered Office:
>>> Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT,
>>> United Kingdom. A company registered in England and Wales. Registered
>>> no. 3134634. VAT no. GB927523612
>>>
>>
>> -- 
>> Signature
>> <http://www.openbet.com/>    Michal Borowiecki
>> Senior Software Engineer L4
>>      T:      +44 208 742 1600
>>
>>      
>>      +44 203 249 8448
>>
>>      
>>       
>>      E:      michal.borowie...@openbet.com
>>      W:      www.openbet.com <http://www.openbet.com/>
>>
>>      
>>      OpenBet Ltd
>>
>>      Chiswick Park Building 9
>>
>>      566 Chiswick High Rd
>>
>>      London
>>
>>      W4 5XT
>>
>>      UK
>>
>>      
>> <https://www.openbet.com/email_promo>
>>
>> This message is confidential and intended only for the addressee. If
>> you have received this message in error, please immediately notify the
>> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
>> from your system as well as any copies. The content of e-mails as well
>> as traffic data may be monitored by OpenBet for employment and
>> security purposes. To protect the environment please do not print this
>> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park
>> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
>> company registered in England and Wales. Registered no. 3134634. VAT
>> no. GB927523612
>>
> 
> -- 
> Signature
> <http://www.openbet.com/>     Michal Borowiecki
> Senior Software Engineer L4
>       T:      +44 208 742 1600
> 
>       
>       +44 203 249 8448
> 
>       
>        
>       E:      michal.borowie...@openbet.com
>       W:      www.openbet.com <http://www.openbet.com/>
> 
>       
>       OpenBet Ltd
> 
>       Chiswick Park Building 9
> 
>       566 Chiswick High Rd
> 
>       London
> 
>       W4 5XT
> 
>       UK
> 
>       
> <https://www.openbet.com/email_promo>
> 
> This message is confidential and intended only for the addressee. If you
> have received this message in error, please immediately notify the
> postmas...@openbet.com <mailto:postmas...@openbet.com> and delete it
> from your system as well as any copies. The content of e-mails as well
> as traffic data may be monitored by OpenBet for employment and security
> purposes. To protect the environment please do not print this e-mail
> unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building
> 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company
> registered in England and Wales. Registered no. 3134634. VAT no.
> GB927523612
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to