Sure, Thanks Matthias. My id is [arunmathew88].
Of course. I was thinking of a subpage where people can collaborate.
Will do as per Michael’s suggestion.
Regards,
Arun Mathew
On 4/7/17, 12:30, "Matthias J. Sax" <matth...@confluent.io> wrote:
Please share your Wiki-ID and a committer can give you write access.
Btw: as you did not initiate the KIP, you should not change the KIP
without the permission of the original author -- in this case Michael.
So you might also just share your thought over the mailing list and
Michael can update the KIP page. Or, as an alternative, just create a
subpage for the KIP page.
@Michael: WDYT?
-Matthias
On 4/6/17 8:05 PM, Arun Mathew wrote:
> Hi Jay,
> Thanks for the advise, I would like to list down the use cases
as
> per your suggestion. But it seems I don't have write permission to the
> Apache Kafka Confluent Space. Whom shall I request for it?
>
> Regarding your last question. We are using a patch in our production
system
> which does exactly this.
> We window by the event time, but trigger punctuate in <punctuate
interval>
> duration of system time, in the absence of an event crossing the
punctuate
> event time.
>
> We are using Kafka Stream for our Audit Trail, where we need to output
the
> event counts on each topic on each cluster aggregated over a 1 minute
> window. We have to use event time to be able to cross check the counts.
But
> we need to trigger punctuate [aggregate event pushes] by system time in
the
> absence of events. Otherwise the event counts for unexpired windows would
> be 0 which is bad.
>
> "Maybe a hybrid solution works: I window by event time but trigger
results
> by system time for windows that have updated? Not really sure the details
> of making that work. Does that work? Are there concrete examples where
you
> actually want the current behavior?"
>
> --
> With Regards,
>
> Arun Mathew
> Yahoo! JAPAN Corporation
>
> On Wed, Apr 5, 2017 at 8:48 PM, Tianji Li <skyah...@gmail.com> wrote:
>
>> Hi Jay,
>>
>> The hybrid solution is exactly what I expect and need for our use cases
>> when dealing with telecom data.
>>
>> Thanks
>> Tianji
>>
>> On Wed, Apr 5, 2017 at 12:01 AM, Jay Kreps <j...@confluent.io> wrote:
>>
>>> Hey guys,
>>>
>>> One thing I've always found super important for this kind of design
work
>> is
>>> to do a really good job of cataloging the landscape of use cases and
how
>>> prevalent each one is. By that I mean not just listing lots of uses,
but
>>> also grouping them into categories that functionally need the same
thing.
>>> In the absence of this it is very hard to reason about design
proposals.
>>> From the proposals so far I think we have a lot of discussion around
>>> possible apis, but less around what the user needs for different use
>> cases
>>> and how they would implement that using the api.
>>>
>>> Here is an example:
>>> You aggregate click and impression data for a reddit like site. Every
ten
>>> minutes you want to output a ranked list of the top 10 articles ranked
by
>>> clicks/impressions for each geographical area. I want to be able run
this
>>> in steady state as well as rerun to regenerate results (or catch up if
it
>>> crashes).
>>>
>>> There are a couple of tricky things that seem to make this hard with
>> either
>>> of the options proposed:
>>> 1. If I emit this data using event time I have the problem described
>> where
>>> a geographical region with no new clicks or impressions will fail to
>> output
>>> results.
>>> 2. If I emit this data using system time I have the problem that when
>>> reprocessing data my window may not be ten minutes but 10 hours if my
>>> processing is very fast so it dramatically changes the output.
>>>
>>> Maybe a hybrid solution works: I window by event time but trigger
results
>>> by system time for windows that have updated? Not really sure the
details
>>> of making that work. Does that work? Are there concrete examples where
>> you
>>> actually want the current behavior?
>>>
>>> -Jay
>>>
>>>
>>> On Tue, Apr 4, 2017 at 8:32 PM, Arun Mathew <arunmathe...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Thanks for the KIP. We were also in need of a mechanism to trigger
>>>> punctuate in the absence of events.
>>>>
>>>> As I described in [
>>>> https://issues.apache.org/jira/browse/KAFKA-3514?
>>>> focusedCommentId=15926036&page=com.atlassian.jira.
>>>> plugin.system.issuetabpanels:comment-tabpanel#comment-15926036
>>>> ],
>>>>
>>>> - Our approached involved using the event time by default.
>>>> - The method to check if there is any punctuate ready in the
>>>> PunctuationQueue is triggered via the any event received by the
>> stream
>>>> tread, or at the polling intervals in the absence of any events.
>>>> - When we create Punctuate objects (which contains the next event
>> time
>>>> for punctuation and interval), we also record the creation time
>>> (system
>>>> time).
>>>> - While checking for maturity of Punctuate Schedule by
>> mayBePunctuate
>>>> method, we also check if the system clock has elapsed the punctuate
>>>> interval since the schedule creation time.
>>>> - In the absence of any event, or in the absence of any event for
>> one
>>>> topic in the partition group assigned to the stream task, the
system
>>>> time
>>>> will elapse the interval and we trigger a punctuate using the
>> expected
>>>> punctuation event time.
>>>> - we then create the next punctuation schedule as punctuation event
>>> time
>>>> + punctuation interval, [again recording the system time of
creation
>>> of
>>>> the
>>>> schedule].
>>>>
>>>> We call this a Hybrid Punctuate. Of course, this approach has pros and
>>>> cons.
>>>> Pros
>>>>
>>>> - Punctuates will happen in <punctuate interval> time duration at
>> max
>>> in
>>>> terms of system time.
>>>> - The semantics as a whole continues to revolve around event time.
>>>> - We can use the old data [old timestamps] to rerun any experiments
>> or
>>>> tests.
>>>>
>>>> Cons
>>>>
>>>> - In case the <punctuate interval> is not a time duration [say
>>> logical
>>>> time/event count], then the approach might not be meaningful.
>>>> - In case there is a case where we have to wait for an actual event
>>> from
>>>> a low event rate partition in the partition group, this approach
>> will
>>>> jump
>>>> the gun.
>>>> - in case the event processing cannot catch up with the event rate
>> and
>>>> the expected timestamp events gets queued for long time, this
>> approach
>>>> might jump the gun.
>>>>
>>>> I believe the above approach and discussion goes close to the approach
>> A.
>>>>
>>>> -----------
>>>>
>>>> I like the idea of having an even count based punctuate.
>>>>
>>>> -----------
>>>>
>>>> I agree with the discussion around approach C, that we should provide
>> the
>>>> user with the option to choose system time or event time based
>>> punctuates.
>>>> But I believe that the user predominantly wants to use event time
while
>>> not
>>>> missing out on regular punctuates due to event delays or event
>> absences.
>>>> Hence a complex punctuate option as Matthias mentioned (quoted below)
>>> would
>>>> be most apt.
>>>>
>>>> "- We might want to add "complex" schedules later on (like, punctuate
>> on
>>>> every 10 seconds event-time or 60 seconds system-time whatever comes
>>>> first)."
>>>>
>>>> -----------
>>>>
>>>> I think I read somewhere that Kafka Streams started with System Time
as
>>> the
>>>> punctuation standard, but was later changed to Event Time. I guess
>> there
>>>> would be some good reason behind it. As Kafka Streams want to evolve
>> more
>>>> on the Stream Processing front, I believe the emphasis on event time
>>> would
>>>> remain quite strong.
>>>>
>>>>
>>>> With Regards,
>>>>
>>>> Arun Mathew
>>>> Yahoo! JAPAN Corporation, Tokyo
>>>>
>>>>
>>>> On Wed, Apr 5, 2017 at 3:53 AM, Thomas Becker <tobec...@tivo.com>
>> wrote:
>>>>
>>>>> Yeah I like PuncutationType much better; I just threw Time out there
>>>>> more as a strawman than an actual suggestion ;) I still think it's
>>>>> worth considering what this buys us over an additional callback. I
>>>>> foresee a number of punctuate implementations following this pattern:
>>>>>
>>>>> public void punctuate(PunctuationType type) {
>>>>> switch (type) {
>>>>> case EVENT_TIME:
>>>>> methodA();
>>>>> break;
>>>>> case SYSTEM_TIME:
>>>>> methodB();
>>>>> break;
>>>>> }
>>>>> }
>>>>>
>>>>> I guess one advantage of this approach is we could add additional
>>>>> punctuation types later in a backwards compatible way (like event
>> count
>>>>> as you mentioned).
>>>>>
>>>>> -Tommy
>>>>>
>>>>>
>>>>> On Tue, 2017-04-04 at 11:10 -0700, Matthias J. Sax wrote:
>>>>>> That sounds promising.
>>>>>>
>>>>>> I am just wondering if `Time` is the best name. Maybe we want to
>> add
>>>>>> other non-time based punctuations at some point later. I would
>>>>>> suggest
>>>>>>
>>>>>> enum PunctuationType {
>>>>>> EVENT_TIME,
>>>>>> SYSTEM_TIME,
>>>>>> }
>>>>>>
>>>>>> or similar. Just to keep the door open -- it's easier to add new
>>>>>> stuff
>>>>>> if the name is more generic.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 4/4/17 5:30 AM, Thomas Becker wrote:
>>>>>>>
>>>>>>> I agree that the framework providing and managing the notion of
>>>>>>> stream
>>>>>>> time is valuable and not something we would want to delegate to
>> the
>>>>>>> tasks. I'm not entirely convinced that a separate callback
>> (option
>>>>>>> C)
>>>>>>> is that messy (it could just be a default method with an empty
>>>>>>> implementation), but if we wanted a single API to handle both
>>>>>>> cases,
>>>>>>> how about something like the following?
>>>>>>>
>>>>>>> enum Time {
>>>>>>> STREAM,
>>>>>>> CLOCK
>>>>>>> }
>>>>>>>
>>>>>>> Then on ProcessorContext:
>>>>>>> context.schedule(Time time, long interval) // We could allow
>> this
>>>>>>> to
>>>>>>> be called once for each value of time to mix approaches.
>>>>>>>
>>>>>>> Then the Processor API becomes:
>>>>>>> punctuate(Time time) // time here denotes which schedule resulted
>>>>>>> in
>>>>>>> this call.
>>>>>>>
>>>>>>> Thoughts?
>>>>>>>
>>>>>>>
>>>>>>> On Mon, 2017-04-03 at 22:44 -0700, Matthias J. Sax wrote:
>>>>>>>>
>>>>>>>> Thanks a lot for the KIP Michal,
>>>>>>>>
>>>>>>>> I was thinking about the four options you proposed in more
>>>>>>>> details
>>>>>>>> and
>>>>>>>> this are my thoughts:
>>>>>>>>
>>>>>>>> (A) You argue, that users can still "punctuate" on event-time
>> via
>>>>>>>> process(), but I am not sure if this is possible. Note, that
>>>>>>>> users
>>>>>>>> only
>>>>>>>> get record timestamps via context.timestamp(). Thus, users
>> would
>>>>>>>> need
>>>>>>>> to
>>>>>>>> track the time progress per partition (based on the partitions
>>>>>>>> they
>>>>>>>> obverse via context.partition(). (This alone puts a huge burden
>>>>>>>> on
>>>>>>>> the
>>>>>>>> user by itself.) However, users are not notified at startup
>> what
>>>>>>>> partitions are assigned, and user are not notified when
>>>>>>>> partitions
>>>>>>>> get
>>>>>>>> revoked. Because this information is not available, it's not
>>>>>>>> possible
>>>>>>>> to
>>>>>>>> "manually advance" stream-time, and thus event-time punctuation
>>>>>>>> within
>>>>>>>> process() seems not to be possible -- or do you see a way to
>> get
>>>>>>>> it
>>>>>>>> done? And even if, it might still be too clumsy to use.
>>>>>>>>
>>>>>>>> (B) This does not allow to mix both approaches, thus limiting
>>>>>>>> what
>>>>>>>> users
>>>>>>>> can do.
>>>>>>>>
>>>>>>>> (C) This should give all flexibility we need. However, just
>>>>>>>> adding
>>>>>>>> one
>>>>>>>> more method seems to be a solution that is too simple (cf my
>>>>>>>> comments
>>>>>>>> below).
>>>>>>>>
>>>>>>>> (D) This might be hard to use. Also, I am not sure how a user
>>>>>>>> could
>>>>>>>> enable system-time and event-time punctuation in parallel.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Overall options (C) seems to be the most promising approach to
>>>>>>>> me.
>>>>>>>> Because I also favor a clean API, we might keep current
>>>>>>>> punctuate()
>>>>>>>> as-is, but deprecate it -- so we can remove it at some later
>>>>>>>> point
>>>>>>>> when
>>>>>>>> people use the "new punctuate API".
>>>>>>>>
>>>>>>>>
>>>>>>>> Couple of follow up questions:
>>>>>>>>
>>>>>>>> - I am wondering, if we should have two callback methods or
>> just
>>>>>>>> one
>>>>>>>> (ie, a unified for system and event time punctuation or one for
>>>>>>>> each?).
>>>>>>>>
>>>>>>>> - If we have one, how can the user figure out, which condition
>>>>>>>> did
>>>>>>>> trigger?
>>>>>>>>
>>>>>>>> - How would the API look like, for registering different
>>>>>>>> punctuate
>>>>>>>> schedules? The "type" must be somehow defined?
>>>>>>>>
>>>>>>>> - We might want to add "complex" schedules later on (like,
>>>>>>>> punctuate
>>>>>>>> on
>>>>>>>> every 10 seconds event-time or 60 seconds system-time whatever
>>>>>>>> comes
>>>>>>>> first). I don't say we should add this right away, but we might
>>>>>>>> want
>>>>>>>> to
>>>>>>>> define the API in a way, that it allows extensions like this
>>>>>>>> later
>>>>>>>> on,
>>>>>>>> without redesigning the API (ie, the API should be designed
>>>>>>>> extensible)
>>>>>>>>
>>>>>>>> - Did you ever consider count-based punctuation?
>>>>>>>>
>>>>>>>>
>>>>>>>> I understand, that you would like to solve a simple problem,
>> but
>>>>>>>> we
>>>>>>>> learned from the past, that just "adding some API" quickly
>> leads
>>>>>>>> to a
>>>>>>>> not very well defined API that needs time consuming clean up
>>>>>>>> later on
>>>>>>>> via other KIPs. Thus, I would prefer to get a holistic
>>>>>>>> punctuation
>>>>>>>> KIP
>>>>>>>> with this from the beginning on to avoid later painful
>> redesign.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 4/3/17 11:58 AM, Michal Borowiecki wrote:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks Thomas,
>>>>>>>>>
>>>>>>>>> I'm also wary of changing the existing semantics of
>> punctuate,
>>>>>>>>> for
>>>>>>>>> backward compatibility reasons, although I like the
>> conceptual
>>>>>>>>> simplicity of that option.
>>>>>>>>>
>>>>>>>>> Adding a new method to me feels safer but, in a way, uglier.
>> I
>>>>>>>>> added
>>>>>>>>> this to the KIP now as option (C).
>>>>>>>>>
>>>>>>>>> The TimestampExtractor mechanism is actually more flexible,
>> as
>>>>>>>>> it
>>>>>>>>> allows
>>>>>>>>> you to return any value, you're not limited to event time or
>>>>>>>>> system
>>>>>>>>> time
>>>>>>>>> (although I don't see an actual use case where you might need
>>>>>>>>> anything
>>>>>>>>> else then those two). Hence I also proposed the option to
>> allow
>>>>>>>>> users
>>>>>>>>> to, effectively, decide what "stream time" is for them given
>>>>>>>>> the
>>>>>>>>> presence or absence of messages, much like they can decide
>> what
>>>>>>>>> msg
>>>>>>>>> time
>>>>>>>>> means for them using the TimestampExtractor. What do you
>> think
>>>>>>>>> about
>>>>>>>>> that? This is probably most flexible but also most
>> complicated.
>>>>>>>>>
>>>>>>>>> All comments appreciated.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Michal
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 03/04/17 19:23, Thomas Becker wrote:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Although I fully agree we need a way to trigger periodic
>>>>>>>>>> processing
>>>>>>>>>> that is independent from whether and when messages arrive,
>>>>>>>>>> I'm
>>>>>>>>>> not sure
>>>>>>>>>> I like the idea of changing the existing semantics across
>> the
>>>>>>>>>> board.
>>>>>>>>>> What if we added an additional callback to Processor that
>> can
>>>>>>>>>> be
>>>>>>>>>> scheduled similarly to punctuate() but was always called at
>>>>>>>>>> fixed, wall
>>>>>>>>>> clock based intervals? This way you wouldn't have to give
>> up
>>>>>>>>>> the
>>>>>>>>>> notion
>>>>>>>>>> of stream time to be able to do periodic processing.
>>>>>>>>>>
>>>>>>>>>> On Mon, 2017-04-03 at 10:34 +0100, Michal Borowiecki wrote:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> I have created a draft for KIP-138: Change punctuate
>>>>>>>>>>> semantics
>>>>>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 138%
>>>>>>>>>>> 3A+C
>>>>>>>>>>> hange+
>>>>>>>>>>> punctuate+semantics>
>>>>>>>>>>> .
>>>>>>>>>>>
>>>>>>>>>>> Appreciating there can be different views on system-time
>> vs
>>>>>>>>>>> event-
>>>>>>>>>>> time
>>>>>>>>>>> semantics for punctuation depending on use-case and the
>>>>>>>>>>> importance of
>>>>>>>>>>> backwards compatibility of any such change, I've left it
>>>>>>>>>>> quite
>>>>>>>>>>> open
>>>>>>>>>>> and
>>>>>>>>>>> hope to fill in more info as the discussion progresses.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Michal
>>>>>>> --
>>>>>>>
>>>>>>>
>>>>>>> Tommy Becker
>>>>>>>
>>>>>>> Senior Software Engineer
>>>>>>>
>>>>>>> O +1 919.460.4747
>>>>>>>
>>>>>>> tivo.com
>>>>>>>
>>>>>>>
>>>>>>> ________________________________
>>>>>>>
>>>>>>> This email and any attachments may contain confidential and
>>>>>>> privileged material for the sole use of the intended recipient.
>> Any
>>>>>>> review, copying, or distribution of this email (or any
>> attachments)
>>>>>>> by others is prohibited. If you are not the intended recipient,
>>>>>>> please contact the sender immediately and permanently delete this
>>>>>>> email and any attachments. No employee or agent of TiVo Inc. is
>>>>>>> authorized to conclude any binding agreement on behalf of TiVo
>> Inc.
>>>>>>> by email. Binding agreements with TiVo Inc. may only be made by a
>>>>>>> signed written agreement.
>>>>>>>
>>>>> --
>>>>>
>>>>>
>>>>> Tommy Becker
>>>>>
>>>>> Senior Software Engineer
>>>>>
>>>>> O +1 919.460.4747
>>>>>
>>>>> tivo.com
>>>>>
>>>>>
>>>>> ________________________________
>>>>>
>>>>> This email and any attachments may contain confidential and
>> privileged
>>>>> material for the sole use of the intended recipient. Any review,
>>> copying,
>>>>> or distribution of this email (or any attachments) by others is
>>>> prohibited.
>>>>> If you are not the intended recipient, please contact the sender
>>>>> immediately and permanently delete this email and any attachments. No
>>>>> employee or agent of TiVo Inc. is authorized to conclude any binding
>>>>> agreement on behalf of TiVo Inc. by email. Binding agreements with
>> TiVo
>>>>> Inc. may only be made by a signed written agreement.
>>>>>
>>>>
>>>
>>
>