Moved this KIP into status "inactive". Feel free to resume and any time.

-Matthias

On 12/18/17 4:09 PM, Matthias J. Sax wrote:
> I just want to point out that the basic idea is great and that we should
> apply optimizations like "filter first" and other. But we should NOT
> convolute this KIP with orthogonal improvements.
> 
> In fact, we have an umbrella JIRA for DSL optimization already:
> https://issues.apache.org/jira/browse/KAFKA-6034
> 
> @Jan: feel free to create sub-tasks for new optimization ideas and we
> can take it from there.
> 
> 
> -Matthias
> 
> 
> On 12/18/17 7:55 AM, Bill Bejeck wrote:
>> Jan,
>>
>> I apologize for the delayed response.
>>
>> my suggestion would be that instead of
>>>
>>> SOURCE -> KTABLESOURCE -> KTABLEFILTER -> JOIN -> SINK
>>>
>>> we build
>>>
>>> SOURCE  -> KTABLEFILTER ->  KTABLESOURCE -> JOIN -> SINK
>>
>>
>> I agree that filtering before the KTable source makes sense and would be a
>> positive change to implement.
>>
>> But the situation above is just one scenario out of many we need to
>> consider.  I'm not sure we can cover all the implications from different
>> use cases ahead of time.
>>
>> So I'm inclined to agree with Guozhang that we come up with clear "rules"
>> (I use the word rules for lack of a better term) for RecordContext usage
>> and inheritance. That way going forward we can have distinct expectations
>> of different use cases.
>>
>> -Bill
>>
>> On Fri, Dec 15, 2017 at 3:57 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>>
>>> Regarding the record context inheritance: I agree it may be a better idea
>>> for now to drop the information when we cannot come up with a consensus
>>> about how the record context should be inherited. Like Bill I was a bit
>>> worried about the lacking of such data lineage information for trouble
>>> shooting in operations or debugging in coding; but I think we can still try
>>> to come up with better solutions in the future by extending the current
>>> protocol, than coming up with something that we realized that we need to
>>> change in the future.
>>>
>>> Regarding the table / filter question: I agree with Jan that we could
>>> consider update the builder so that we will push down the filter earlier
>>> than KTable source that materialized the store; on the other hand, I think
>>> Matthias' point is that even doing this does not completely exclude the
>>> scenarios that you'd have the old/new pairs in your Tables, for example,
>>> consider:
>>>
>>> table1 = stream1.groupBy(...).aggregate(...)
>>> table2 = table1.filter(..., Materialized.as(...))
>>>
>>> In this case table2 is filtering on table1 which is not read from the
>>> source, and hence it already outputs the old/new pairs already, so we still
>>> need to consider how to handle it.
>>>
>>>
>>> So I'd suggest the following execution plan towards KIP-159:
>>>
>>> 1) revisit our record context (topic, partition, offset, timestamp)
>>> protocols that is used at the DSL layer, make it clear at which high-level
>>> operators we should apply certain inheritance rule, and which others we
>>> should drop such information.
>>>     1.1) modify the lower-level PAPI that DSL leverages, to allow the
>>> caller (DSL) to modify the record context (note that today for lower-level
>>> API, the record context is always passed through when forwarding to the
>>> next processor node)
>>> 2) at the same time, consider optimizing the source KTable filter cases (I
>>> think we already have certain JIRA tickets for this) so that the filter
>>> operator is pushed early than the KTABLESOURCE node where materialization
>>> happens.
>>> 3) after 1) is done, come back to KIP-159 and add the proposed APIs.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Thu, Dec 7, 2017 at 12:27 PM, Jan Filipiak <jan.filip...@trivago.com>
>>> wrote:
>>>
>>>> Thank you Bill,
>>>>
>>>> I think this is reasonable. Do you have any suggestion
>>>> for handling oldValues in cases like
>>>>
>>>> builder.table().filter(RichPredicate).join()
>>>>
>>>> where we process a Change with old and new value and dont have a record
>>>> context for old.
>>>>
>>>> my suggestion would be that instead of
>>>>
>>>> SOURCE -> KTABLESOURCE -> KTABLEFILTER -> JOIN -> SINK
>>>>
>>>> we build
>>>>
>>>> SOURCE  -> KTABLEFILTER ->  KTABLESOURCE -> JOIN -> SINK
>>>>
>>>> We should build a topology like this from the beginning and not have
>>>> an optimisation phase afterwards.
>>>>
>>>> Any opinions?
>>>>
>>>> Best Jan
>>>>
>>>>
>>>>
>>>>
>>>> On 05.12.2017 17:34, Bill Bejeck wrote:
>>>>
>>>>> Matthias,
>>>>>
>>>>> Overall I agree with what you've presented here.
>>>>>
>>>>> Initially, I was hesitant to remove information from the context of the
>>>>> result records (Joins or Aggregations) with the thought that when there
>>>>> are
>>>>> unexpected results, the source information would be useful for tracing
>>>>> back
>>>>> where the error could have occurred.  But in the case of Joins and
>>>>> Aggregations, the amount of data needed to do meaningful analysis could
>>> be
>>>>> too much. For example, a join result could come from two topics so you'd
>>>>> need to keep both original topic names, offsets, etc. (plus the broker
>>>>> could have deleted the records in the interim so even having offset
>>> could
>>>>> provide nothing).
>>>>>
>>>>> I'm bit long winded here, but I've come full circle to your original
>>>>> proposal that since Joins and Aggregations produce fundamentally new
>>>>> types,
>>>>> we drop the corresponding information from the context even in the case
>>> of
>>>>> single topic aggregations.
>>>>>
>>>>> Thanks,
>>>>> Bill
>>>>>
>>>>> On Mon, Dec 4, 2017 at 7:02 PM, Matthias J. Sax <matth...@confluent.io>
>>>>> wrote:
>>>>>
>>>>> I agree with Guozhang that just exposing meta data at the source level
>>>>>> might not provide too much value. Furthermore, for timestamps we do
>>>>>> already have a well defined contract and we should exploit it:
>>>>>> timestamps can always be provided in a meaningful way.
>>>>>>
>>>>>> Also, for simple operations like KStream-filter/map the contract is
>>>>>> simple and we can just use it. Same for KTable-filter/map (for new
>>>>>> values).
>>>>>>
>>>>>> For aggregations, join, and oldValue, I could just drop some
>>> information
>>>>>> and return `null`/-1, if the result records has no semantically
>>>>>> meaningful meta data.
>>>>>>
>>>>>> For example, for aggregations, we could preserve the partition (as all
>>>>>> agg-input-records have the same partition). For single input topic
>>>>>> aggregation (what I guess is the most prominent case), we can also
>>> carry
>>>>>> over the topic name (would be a internal repartitioning topic name
>>>>>> often). Offsets don't have any semantic interpretation IMHO and we
>>> could
>>>>>> return -1.
>>>>>>
>>>>>> For joins, we could keep the partition information. Topic and offset
>>> are
>>>>>> both unknown/invalid for the output record IMHO.
>>>>>>
>>>>>> For the oldValue case, we can keep partition and for single input topic
>>>>>> case topic name. Timestamp might be -1 for now, but after we added
>>>>>> timestamps to KTable (what we plan to do anyway), we can also return a
>>>>>> valid timestamp. Offset would be -1 again (if we store offset in KTable
>>>>>> too, we could provide all offset as well -- but I don't see too much
>>>>>> value in doing this compared to the storage overhead this implies).
>>>>>>
>>>>>>
>>>>>> WDYT?
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 11/29/17 4:14 AM, Jan Filipiak wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> thank you for the summary and thanks for acknowledging that I do have
>>> a
>>>>>>> point here.
>>>>>>>
>>>>>>> I don't like the second Idea at all. Hence I started of this
>>> discussion.
>>>>>>>
>>>>>>> I am just disappointed, back then when we had the discussion about how
>>>>>>> to refactor store overload
>>>>>>> and IQ handling, I knew the path we are taking is wrong. Having
>>> problems
>>>>>>> implementing these kinda
>>>>>>> features (wich are really simple)  is just a symptom of messed up IQ
>>>>>>> implementation. I wish really bad
>>>>>>> I could have convinced you guys back then. To be honest with IQ we can
>>>>>>> continue here
>>>>>>> as we Materialize but would not send oldValue, but with join you're
>>> out
>>>>>>> of luck with current setup.
>>>>>>>
>>>>>>> I of course recommend to do not introduce any optimizations here. Id
>>>>>>> recommend to go towards what
>>>>>>> I recommended already back then. So i would't say we need to optimize
>>>>>>> anything later we need to build
>>>>>>> the topology better in the first place.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 28.11.2017 21:00, Guozhang Wang wrote:
>>>>>>>
>>>>>>>> Jan,
>>>>>>>>
>>>>>>>> Thanks for your input, I can understand now that the oldValue is also
>>>>>>>> exposed in user customized `filter` function and hence want record
>>>>>>>> context
>>>>>>>> we should expose is a problem. And I think it does brings a good
>>> point
>>>>>>>>
>>>>>>> to
>>>>>>
>>>>>>> consider for KIP-159. The discussions maybe a bit confusing to reader
>>>>>>>> though, and hence I'd like to summarize the status quo and with a
>>>>>>>> proposal:
>>>>>>>>
>>>>>>>> In today's Streams DSL, when a KTable is created either from a source
>>>>>>>> topic, or from an stateful operator, we will materialize the KTable
>>>>>>>> with a
>>>>>>>> backing state store; on the other hand, KTables created from a
>>>>>>>> non-stateful
>>>>>>>> operator like filter, will not be backed by a state store by default
>>>>>>>> unless
>>>>>>>> users indicate so (e.g. using the overloaded function with the
>>>>>>>> queryable
>>>>>>>> name or store supplier).
>>>>>>>>
>>>>>>>> For example:
>>>>>>>>
>>>>>>>> KTable table1 = builder.table("topic");
>>>>>>>>
>>>>>>> // a
>>>>>>
>>>>>>> state store created for table1
>>>>>>>> KTable table2 = table1.filter(..);
>>>>>>>> // no state store created for table2
>>>>>>>> KTable table3 = table1.filter(.., "storeName");                  // a
>>>>>>>> state
>>>>>>>> store created for table3
>>>>>>>> KTable table4 = table1.groupBy(..).aggregate(..);            // a
>>>>>>>> state
>>>>>>>> store created for table4
>>>>>>>>
>>>>>>>> Because of that, the filter() operator above on table1 will always be
>>>>>>>> exposed with oldValue and newValue; Damian's point is that, we may
>>>>>>>> optimize
>>>>>>>> the first case such that table1 will only be materialized if users
>>>>>>>> asked so
>>>>>>>> (e.g. using the overloaded function with a store supplier), and in
>>>>>>>> which
>>>>>>>> case, we do not need to pass newValue / oldValue pairs (I think this
>>> is
>>>>>>>> what Jan suggests as well, i.e. do filtering before materializing, so
>>>>>>>> that
>>>>>>>> we can have a smaller backed state store as well). But this
>>>>>>>> optimization
>>>>>>>> does not eliminate the possibilities that we may still need to do
>>>>>>>> filter if
>>>>>>>> users does specify "yes I do want to the source KTable itself to be
>>>>>>>> materialized, please". So the concern about how to expose the record
>>>>>>>> context in such cases still persists.
>>>>>>>>
>>>>>>>>
>>>>>>>> With that, regarding to KIP-159 itself, here are my thoughts:
>>>>>>>>
>>>>>>>> 1) if we restrict the scope of exposing record context only to source
>>>>>>>> KTables / KStreams I felt the KIP itself does not bring much value
>>>>>>>> given
>>>>>>>> its required API change because only the SourceKStream can safely
>>>>>>>> maintain
>>>>>>>> its records context, and for SourceKTable if it is materialized, then
>>>>>>>> even
>>>>>>>> non-stateful operators like Join may still have a concern about
>>>>>>>> exposing
>>>>>>>> the record context.
>>>>>>>>
>>>>>>>> 2) an alternative idea is we provide the semantics on how record
>>>>>>>> context
>>>>>>>> would be inherited across the operators for KTable / KStream and
>>>>>>>> expose it
>>>>>>>> in all operators (similarly in PAPI we would expose a much simpler
>>>>>>>> contract), and make it as a public contract that Streams library will
>>>>>>>> guarantee moving forward even we optimize our topology builder; it
>>> may
>>>>>>>> not
>>>>>>>> align perfectly with the linear algebraic semantics but practically
>>>>>>>> applicable for most cases; if users semantics do not fit in the
>>>>>>>> provided
>>>>>>>> contract, then they may need to get this themselves (embed such
>>>>>>>> information
>>>>>>>> in the value payload, for example).
>>>>>>>>
>>>>>>>> If people do not like the second idea, I'd suggest we hold on
>>> pursuing
>>>>>>>> the
>>>>>>>> first direction since to me its beneficial scope is too limited
>>>>>>>> compared to
>>>>>>>> its cost.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Nov 24, 2017 at 1:39 AM, Jan Filipiak <
>>>>>>>> jan.filip...@trivago.com
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Cleary we show the oldValue to the user. We have to, because we
>>> filter
>>>>>>>>> after the store.
>>>>>>>>> https://github.com/axbaretto/kafka/blob/master/streams/src/m
>>>>>>>>> ain/java/org/apache/kafka/streams/kstream/internals/
>>>>>>>>>
>>>>>>>> KTableFilter.java#L96
>>>>>>
>>>>>>>
>>>>>>>>> I cannot help you following this. It is really obvious and I am
>>>>>>>>> running
>>>>>>>>> out of tools for explaining.
>>>>>>>>>
>>>>>>>>> Thanks for understanding my point to put filter before. Not only
>>>>>>>>> would it
>>>>>>>>> make the store smaller. It would make this feature reasonably
>>>>>>>>> possible and
>>>>>>>>> the framework easier. Interestingly it would also help to move IQ
>>>>>>>>> into more
>>>>>>>>> reasonable directions. And it might help understand that we do not
>>>>>>>>> need any
>>>>>>>>> intermediate representation of the topology,
>>>>>>>>>
>>>>>>>>> KIP-182 I have no clue what everyone has with their "bytestores" so
>>>>>>>>> broken. But putting another store after doesn't help when the store
>>>>>>>>> before
>>>>>>>>> is the problem.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 24.11.2017 05:08, Matthias J. Sax wrote:
>>>>>>>>>
>>>>>>>>>    From a DSL point of view, users only see the new value on a
>>>>>>>>>> KTable#filter anyway. So why should it be an issue that we use
>>>>>>>>>> <newValue,oldValue> pair under the hood?
>>>>>>>>>>
>>>>>>>>>> User sees newValue and gets corresponding RecordContext. I can't
>>> see
>>>>>>>>>> any
>>>>>>>>>> issue here?
>>>>>>>>>>
>>>>>>>>>> I cannot follow here:
>>>>>>>>>>
>>>>>>>>>> Even when we have a statefull operation last. We move it to the
>>> very
>>>>>>>>>>
>>>>>>>>>>> first processor (KtableSource)
>>>>>>>>>>>> and therefore cant present a proper RecordContext.
>>>>>>>>>>>>
>>>>>>>>>>>> With regard to `builder.table().filter()`:
>>>>>>>>>>
>>>>>>>>>> I see you point that it would be good to be able to apply the
>>>>>>>>>> filter()
>>>>>>>>>> first to reduce the stat store size of the table. But how is this
>>>>>>>>>> related to KIP-159?
>>>>>>>>>>
>>>>>>>>>> Btw: with KIP-182, I am wondering if this would not be possible, by
>>>>>>>>>> putting a custom dummy store into the table and materialize the
>>>>>>>>>> filter
>>>>>>>>>> result afterwards? It's not a nice way to do, but seems to be
>>>>>>>>>>
>>>>>>>>> possible.
>>>>>>
>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>> On 11/23/17 4:56 AM, Jan Filipiak wrote:
>>>>>>>>>>
>>>>>>>>>> The comment is valid. It falls exactly into this topic, it has
>>>>>>>>>>>
>>>>>>>>>> exactly
>>>>>>
>>>>>>> todo with this!
>>>>>>>>>>> Even when we have a statefull operation last. We move it to the
>>> very
>>>>>>>>>>> first processor (KtableSource)
>>>>>>>>>>> and therefore cant present a proper RecordContext.
>>>>>>>>>>>
>>>>>>>>>>> Regarding the other Jiras you are referring to. They harm the
>>>>>>>>>>> project
>>>>>>>>>>> more than they do good!
>>>>>>>>>>> There is no need for this kind of optimizer and meta
>>> representation
>>>>>>>>>>> and
>>>>>>>>>>> what not. I hope they
>>>>>>>>>>> never get implemented.
>>>>>>>>>>>
>>>>>>>>>>> Best Jan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 22.11.2017 14:44, Damian Guy wrote:
>>>>>>>>>>>
>>>>>>>>>>> Jan, i think you comment with respect to filtering is valid,
>>> though
>>>>>>>>>>>> not for
>>>>>>>>>>>> this KIP. We have separate JIRAs for topology optimization of
>>>>>>>>>>>> which this
>>>>>>>>>>>> falls into.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Damian
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, 22 Nov 2017 at 02:25 Guozhang Wang <wangg...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Jan,
>>>>>>>>>>>>
>>>>>>>>>>>>> Not sure I understand your argument that "we still going to
>>>>>>>>>>>>> present
>>>>>>>>>>>>> change.oldValue to the filter even though the record context()
>>> is
>>>>>>>>>>>>> for
>>>>>>>>>>>>> change.newValue". Are you referring to `KTableFilter#process()`?
>>>>>>>>>>>>> If yes
>>>>>>>>>>>>> could you point to me which LOC are you concerning about?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak <
>>>>>>>>>>>>> jan.filip...@trivago.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> a remark of mine that got missed during migration:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> There is this problem that even though we have
>>>>>>>>>>>>>> source.table.filter.join
>>>>>>>>>>>>>> the state-fullness happens at the table step not a the join
>>>>>>>>>>>>>> step. In a
>>>>>>>>>>>>>> filter
>>>>>>>>>>>>>> we still going to present change.oldValue to the filter even
>>>>>>>>>>>>>>
>>>>>>>>>>>>> though
>>>>>>
>>>>>>> the
>>>>>>>>>>>>>> record context() is for change.newValue. I would go as far as
>>>>>>>>>>>>>> applying
>>>>>>>>>>>>>> the filter before the table processor. Not to just get KIP-159,
>>>>>>>>>>>>>>
>>>>>>>>>>>>> but
>>>>>>
>>>>>>> because
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think its a side effect of a non ideal topology layout. If i
>>> can
>>>>>>>>>>>>>> filter
>>>>>>>>>>>>>> 99% of my
>>>>>>>>>>>>>> records. my state could be way smaller. Also widely escalates
>>> the
>>>>>>>>>>>>>> context
>>>>>>>>>>>>>> of the KIP
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I can only see upsides of executing the filter first.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 20.11.2017 22:22, Matthias J. Sax wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am moving this back to the DISCUSS thread... Last 10 emails
>>>>>>>>>>>>>> were
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> sent
>>>>>>>>>>>>>>> to VOTE thread.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Copying Guozhang's last summary below. Thanks for this
>>> summary.
>>>>>>>>>>>>>>> Very
>>>>>>>>>>>>>>> comprehensive!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It seems, we all agree, that the current implementation of the
>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>> at PAPI level is ok, but we should not leak it into DSL.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thus, we can go with (2) or (3), were (3) is an extension to
>>> (2)
>>>>>>>>>>>>>>> carrying the context to more operators than just sources. It
>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>> seems,
>>>>>>>>>>>>>>> that we all agree, that many-to-one operations void the
>>> context.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I still think, that just going with plain (2) is too
>>>>>>>>>>>>>>> restrictive --
>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>> I am also fine if we don't go with the full proposal of (3).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Also note, that the two operators filter() and filterNot()
>>> don't
>>>>>>>>>>>>>>> modify
>>>>>>>>>>>>>>> the record and thus for both, it would be absolutely valid to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> keep
>>>>>>
>>>>>>> the
>>>>>>>>>>>>>>> context.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I personally would keep the context for at least all
>>> one-to-one
>>>>>>>>>>>>>>> operators. One-to-many is debatable and I am fine to not carry
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the
>>>>>>
>>>>>>> context further: at least the offset information is
>>>>>>>>>>>>>>> questionable for
>>>>>>>>>>>>>>> this case -- note thought, that semantically, the timestamp is
>>>>>>>>>>>>>>> inherited
>>>>>>>>>>>>>>> via one-to-many, and I also think this applies to "topic" and
>>>>>>>>>>>>>>> "partition". Thus, I think it's still valuable information we
>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>> carry
>>>>>>>>>>>>>>> downstreams.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jan: which approach are you referring to as "the approach that
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>> on the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> table would be perfect"?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Note that in today's PAPI layer we are already effectively
>>>>>>>>>>>>>>>> exposing the
>>>>>>>>>>>>>>>> record context which has the issues that we have been
>>>>>>>>>>>>>>>> discussing
>>>>>>>>>>>>>>>> right
>>>>>>>>>>>>>>>> now,
>>>>>>>>>>>>>>>> and its semantics is always referring to the "processing
>>>>>>>>>>>>>>>> record" at
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> hand.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> More specifically, we can think of processing a record a bit
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> different:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1) the record traversed the topology from source to sink, it
>>>>>>>>>>>>>>>> may be
>>>>>>>>>>>>>>>> transformed into new object or even generate multiple new
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> objects
>>>>>>
>>>>>>> (think:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> branch) along the traversal. And the record context is
>>> referring
>>>>>>>>>>>>>>
>>>>>>>>>>>>> to
>>>>>>
>>>>>>> this
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> processing record. Here the "lifetime" of the record lasts for
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> entire
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> topology traversal and any new records of this traversal is
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> treated as
>>>>>>>>>>>>>>>> different transformed values of this record (this applies to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> join
>>>>>>
>>>>>>> and
>>>>>>>>>>>>>>>> aggregations as well).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2) the record being processed is wiped out in the first
>>>>>>>>>>>>>>>> operator
>>>>>>>>>>>>>>>> after
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> source, and NEW records are forwarded to downstream
>>> operators.
>>>>>>>>>>>>>>>> I.e.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> record only lives between two adjacent operators, once it
>>>>>>>>>>>>>> reached the
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> operator it's lifetime has ended and new records are generated.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think in the past we have talked about Streams under both
>>>>>>>>>>>>>>>> context,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> do not have a clear agreement. I agree that 2) is logically
>>> more
>>>>>>>>>>>>>>>> understandable for users as it does not leak any internal
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> details (e.g. for stream-table joins, table record's traversal
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ends at
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> join operator as it is only be materialized, while stream
>>>>>>>>>>>>>>>> record's
>>>>>>>>>>>>>>>> traversal goes through the join operator to further down
>>> until
>>>>>>>>>>>>>>>> sinks).
>>>>>>>>>>>>>>>> However if we are going to interpret following 2) above then
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> even
>>>>>>
>>>>>>> for
>>>>>>>>>>>>>>>> non-stateful operators we would not inherit record context.
>>>>>>>>>>>>>>>> What
>>>>>>>>>>>>>>>> we're
>>>>>>>>>>>>>>>> discussing now, seems to infer a third semantics:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 3) a record would traverse "through" one-to-one
>>> (non-stateful)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> operators,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> will "replicate" at one-to-many (non-stateful) operators
>>> (think:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> "mapValues"
>>>>>>>>>>>>>>>>       ) and will "end" at many-to-one (stateful) operators
>>>>>>>>>>>>>>>> where NEW
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> records
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> will be generated and forwarded to the downstream operators.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Just wanted to lay the ground for discussions so we are all on
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>> page before chatting more.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 11/6/17 1:41 PM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks a lot for correcting. It is a leftover from the past
>>>>>>>>>>>>>>>> designs
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> punctuate() was not deprecated.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I corrected.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Nov 6, 2017 at 5:30 PM Matthias J. Sax
>>>>>>>>>>>>>>>> <matth...@confluent.io>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I just re-read the KIP.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> One minor comment: we don't need to introduce any deprecated
>>>>>>>>>>>>>>>>> methods.
>>>>>>>>>>>>>>>>> Thus, RichValueTransformer#punctuate can be removed
>>> completely
>>>>>>>>>>>>>>>>> instead
>>>>>>>>>>>>>>>>> of introducing it as deprecated.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Otherwise looks good to me.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for being so patient!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 11/1/17 9:16 PM, Guozhang Wang wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Jeyhun,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I think I'm convinced to not do KAFKA-3907 in this KIP. We
>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> carefully if we should add this functionality to the DSL
>>> layer
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> moving
>>>>>>>>>>>>>>>>>> forward since from what we discovered working on it the
>>>>>>>>>>>>>>>>>> conclusion is
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> it would require revamping the public APIs quite a lot, and
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> it's
>>>>>>
>>>>>>> not
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> clear
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> if it is a good trade-off than asking users to call
>>> process()
>>>>>>>>>>>>>>>>> instead.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Nov 1, 2017 at 4:50 AM, Damian Guy
>>>>>>>>>>>>>>>>>> <damian....@gmail.com>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Jeyhun, thanks, looks good.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Do we need to remove the line that says:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>         - on-demand commit() feature
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, 31 Oct 2017 at 23:07 Jeyhun Karimov <
>>>>>>>>>>>>>>>>>>> je.kari...@gmail.com>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I removed the 'commit()' feature, as we discussed. It
>>>>>>>>>>>>>>>>>>> simplified
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> overall design of KIP a lot.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If it is ok, I would like to start a VOTE thread.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Oct 27, 2017 at 5:28 PM Matthias J. Sax <
>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks. I understand what you are saying, but I don't
>>>>>>>>>>>>>>>>>>>> agree that
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> but also we need a commit() method
>>>>>>>>>>>>>>>>>>>>> I would just not provide `commit()` at DSL level and
>>>>>>>>>>>>>>>>>>>>> close the
>>>>>>>>>>>>>>>>>>>>> corresponding Jira as "not a problem" or similar.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On 10/27/17 3:42 PM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks for your comments. I agree that this is not the
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> best
>>>>>>
>>>>>>> way
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> do.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> A
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> bit of history behind this design.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Prior doing this, I tried to provide ProcessorContext
>>>>>>>>>>>>>>>>>>>>>> itself
>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> argument
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> in Rich interfaces. However, we dont want to give users
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> that
>>>>>>
>>>>>>> flexibility
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> and “power”. Moreover, ProcessorContext contains
>>> processor
>>>>>>>>>>>>>>>>>>>>> level
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> information and not Record level info. The only thing we
>>>>>>>>>>>>>>>>>>>>>> need ij
>>>>>>>>>>>>>>>>>>>>>> ProcessorContext is commit() method.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> So, as far as I understood, we need recor context
>>>>>>>>>>>>>>>>>>>>>> (offset,
>>>>>>>>>>>>>>>>>>>>>> timestamp
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> etc) but also we need a commit() method ( we dont want
>>> to
>>>>>>>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> ProcessorContext as a parameter so users can use
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> ProcessorContext.commit()
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> ).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> As a result, I thought to “propagate” commit() call
>>> from
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> ProcessorContext() .
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> If there is a misunderstanding in motvation/discussion
>>> of
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> KIP/included
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> jiras please let me know.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri 27. Oct 2017 at 12:39, Matthias J. Sax <
>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I am personally still not convinced, that we should add
>>>>>>>>>>>>>>>>>>>>> `commit()`
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> all.
>>>>>>>>>>>>>>>>>>>>> @Guozhang: you created the original Jira. Can you
>>>>>>>>>>>>>>>>>>>>> elaborate a
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> little
>>>>>>>>>>>>>>>>>>>>>>> bit? Isn't requesting commits a low level API that
>>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>> not be
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> exposed
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> in the DSL? Just want to understand the motivation
>>>>>>>>>>>>>>>>>>>>> better. Why
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> anybody that uses the DSL ever want to request a commit?
>>> To
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> me,
>>>>>>>>>>>>>>>>>>>>>>> requesting commits is useful if you manipulated state
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> explicitly,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> ie,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> via Processor API.
>>>>>>>>>>>>>>>>>>>>> Also, for the solution: it seem rather unnatural to me,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> that we
>>>>>>>>>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>>>>>>> `commit()` to `RecordContext` -- from my
>>> understanding,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> `RecordContext`
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> is an helper object that provide access to record meta
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> data.
>>>>>>
>>>>>>> Requesting
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> a commit is something quite different. Additionally, a
>>>>>>>>>>>>>>>>>>>>> commit
>>>>>>>>>>>>>>>>>>>>> does
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> commit a specific record but a `RecrodContext` is for a
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> specific
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> record.
>>>>>>>>>>>>>>>>>>>>> To me, this does not seem to be a sound API design if we
>>>>>>>>>>>>>>>>>>>>> follow
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> path.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On 10/26/17 10:41 PM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your suggestions.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I have some comments, to make sure that there is no
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> misunderstanding.
>>>>>>>>>>>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in
>>>>>>>>>>>>>>>>>>>>>>>> ProcessorContext,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> enforce
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> user to consolidate this call as
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> internal
>>>>>>
>>>>>>> implementation
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl`
>>> is
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> changed
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> this call.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> - I think we should not deprecate
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()`.
>>>>>>>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> main
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> intuition that we introduce `commit()` in
>>>>>>>>>>>>>>>>>>>>>> `RecordContext` is
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> that,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> `RecordContext` is the one which is provided in Rich
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> interfaces.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>> wants to commit, then there should be some method
>>> inside
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> `RecordContext`
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> do so. Internally, `RecordContext.commit()` calls
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()`  (see the last code
>>>>>>>>>>>>>>>>>>>>>>>> snippet in
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> KIP-159):
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> @Override
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>          public void process(final K1 key, final V1
>>>>>>>>>>>>>>>>>>>>> value) {
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>              recordContext = new RecordContext()
>>>>>>>>>>>>>>>>>>>>>>>> {               //
>>>>>>>>>>>>>>>>>>>>>>>> recordContext initialization is added in this KIP
>>>>>>>>>>>>>>>>>>>>>>>>                  @Override
>>>>>>>>>>>>>>>>>>>>>>>>                  public void commit() {
>>>>>>>>>>>>>>>>>>>>>>>>                      context().commit();
>>>>>>>>>>>>>>>>>>>>>>>>                  }
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>                  @Override
>>>>>>>>>>>>>>>>>>>>>>>>                  public long offset() {
>>>>>>>>>>>>>>>>>>>>>>>>                      return
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> context().recordContext().offs
>>>>>>
>>>>>>> et();
>>>>>>>>>>>>>>>>>>>>>>>>                  }
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>                  @Override
>>>>>>>>>>>>>>>>>>>>>>>>                  public long timestamp() {
>>>>>>>>>>>>>>>>>>>>>>>>                      return
>>>>>>>>>>>>>>>>>>>>>>>> context().recordContext().timestamp();
>>>>>>>>>>>>>>>>>>>>>>>>                  }
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>                  @Override
>>>>>>>>>>>>>>>>>>>>>>>>                  public String topic() {
>>>>>>>>>>>>>>>>>>>>>>>>                      return
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> context().recordContext().topi
>>>>>>
>>>>>>> c();
>>>>>>>>>>>>>>>>>>>>>>>>                  }
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>                  @Override
>>>>>>>>>>>>>>>>>>>>>>>>                  public int partition() {
>>>>>>>>>>>>>>>>>>>>>>>>                      return
>>>>>>>>>>>>>>>>>>>>>>>> context().recordContext().partition();
>>>>>>>>>>>>>>>>>>>>>>>>                  }
>>>>>>>>>>>>>>>>>>>>>>>>            };
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> So, we cannot deprecate `ProcessorContext.commit()`
>>> in
>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> IMO.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2. Add the `task` reference to the impl class,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> that it can implement the commit call itself.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> - Actually, I don't think that we need `commit()` in
>>>>>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`. The main intuition is to
>>>>>>>>>>>>>>>>>>>>>>>> "transfer"
>>>>>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` call to Rich interfaces,
>>> to
>>>>>>>>>>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>>>>>>>>>> user-specific committing.
>>>>>>>>>>>>>>>>>>>>>>>>       To do so, we introduce `commit()` method in
>>>>>>>>>>>>>>>>>>>>>>>> `RecordContext()`
>>>>>>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> call ProcessorContext.commit() inside. (see the above
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> code
>>>>>>
>>>>>>> snippet)
>>>>>>>>>>>>>>>>>>>>>>>> So, in Rich interfaces, we are not dealing with
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> at all, and we leave all its methods as it is.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> In this KIP, we made `RecordContext` to be the parent
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> class of
>>>>>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, just because of they share
>>>>>>>>>>>>>>>>>>>>>>>> quite
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> amount
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> methods and it is logical to enable inheritance between
>>>>>>>>>>>>>>>>>>>>>> those
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> two.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However, call
>>> to a
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> commit()
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> method,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> is valid only within RecordContext interface (at least
>>> for
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> now),
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>>>> snippet
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> below would need to be updated as well.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> - I think above explanation covers this as well.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I want to gain some speed to this KIP, as it has gone
>>>>>>>>>>>>>>>>>>>>>>>> though
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> many
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> based on user/developer needs, both in
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> documentation-/implementation-wise.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 24, 2017 at 1:41 AM Guozhang Wang <
>>>>>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks for the information Jeyhun. I had also forgot
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> about
>>>>>>
>>>>>>> KAFKA-3907
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> this KIP..
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thinking a bit more, I'm now inclined to go with what
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>
>>>>>>> agreed
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> before,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> add the commit() call to `RecordContext`. A few minor
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> tweaks on
>>>>>>>>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> implementation:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> ProcessorContext,
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> enforce
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> user to consolidate this call as
>>>>>>>>>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> internal
>>>>>>
>>>>>>> implementation
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl`
>>> is
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> changed
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> this call.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 2. Add the `task` reference to the impl class,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, so
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> that it can implement the commit call itself.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However,
>>>>>>>>>>>>>>>>>>>>>>>>> call to a
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> commit()
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> method,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> is valid only within RecordContext interface (at least
>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> now),
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> throw
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>>>> snippet
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> below would need to be updated as well.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 23, 2017 at 1:40 PM, Matthias J. Sax <
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> Fair point. This is a long discussion and I totally
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> forgot
>>>>>>
>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> discussed this.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Seems I changed my opinion about including
>>> KAFKA-3907...
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Happy to hear what others think.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On 10/23/17 1:20 PM, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> It is probably my bad, the discussion was a bit
>>>>>>>>>>>>>>>>>>>>>>>>>>> long in
>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> thread. I
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> proposed the related issue in the related KIP
>>> discuss
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> thread
>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> got
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> approval [2,3].
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Maybe I misunderstood.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kaf
>>>>>>>>>>>>>>>>>>>>>>>>>>> ka/uyzND19Asmg1GKKXT1?subj=
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Streams
>>>>>>
>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kaf
>>>>>>>>>>>>>>>>>>>>>>>>>>> ka/uyzND1kpct22GKKXT1?subj=
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Streams
>>>>>>
>>>>>>> [3]
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka/uyzND1G6TGIGKKXT1?subj=
>>>>>>
>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Streams
>>>>>
>>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to