Moved this KIP into status "inactive". Feel free to resume and any time.
-Matthias On 12/18/17 4:09 PM, Matthias J. Sax wrote: > I just want to point out that the basic idea is great and that we should > apply optimizations like "filter first" and other. But we should NOT > convolute this KIP with orthogonal improvements. > > In fact, we have an umbrella JIRA for DSL optimization already: > https://issues.apache.org/jira/browse/KAFKA-6034 > > @Jan: feel free to create sub-tasks for new optimization ideas and we > can take it from there. > > > -Matthias > > > On 12/18/17 7:55 AM, Bill Bejeck wrote: >> Jan, >> >> I apologize for the delayed response. >> >> my suggestion would be that instead of >>> >>> SOURCE -> KTABLESOURCE -> KTABLEFILTER -> JOIN -> SINK >>> >>> we build >>> >>> SOURCE -> KTABLEFILTER -> KTABLESOURCE -> JOIN -> SINK >> >> >> I agree that filtering before the KTable source makes sense and would be a >> positive change to implement. >> >> But the situation above is just one scenario out of many we need to >> consider. I'm not sure we can cover all the implications from different >> use cases ahead of time. >> >> So I'm inclined to agree with Guozhang that we come up with clear "rules" >> (I use the word rules for lack of a better term) for RecordContext usage >> and inheritance. That way going forward we can have distinct expectations >> of different use cases. >> >> -Bill >> >> On Fri, Dec 15, 2017 at 3:57 PM, Guozhang Wang <wangg...@gmail.com> wrote: >> >>> Regarding the record context inheritance: I agree it may be a better idea >>> for now to drop the information when we cannot come up with a consensus >>> about how the record context should be inherited. Like Bill I was a bit >>> worried about the lacking of such data lineage information for trouble >>> shooting in operations or debugging in coding; but I think we can still try >>> to come up with better solutions in the future by extending the current >>> protocol, than coming up with something that we realized that we need to >>> change in the future. >>> >>> Regarding the table / filter question: I agree with Jan that we could >>> consider update the builder so that we will push down the filter earlier >>> than KTable source that materialized the store; on the other hand, I think >>> Matthias' point is that even doing this does not completely exclude the >>> scenarios that you'd have the old/new pairs in your Tables, for example, >>> consider: >>> >>> table1 = stream1.groupBy(...).aggregate(...) >>> table2 = table1.filter(..., Materialized.as(...)) >>> >>> In this case table2 is filtering on table1 which is not read from the >>> source, and hence it already outputs the old/new pairs already, so we still >>> need to consider how to handle it. >>> >>> >>> So I'd suggest the following execution plan towards KIP-159: >>> >>> 1) revisit our record context (topic, partition, offset, timestamp) >>> protocols that is used at the DSL layer, make it clear at which high-level >>> operators we should apply certain inheritance rule, and which others we >>> should drop such information. >>> 1.1) modify the lower-level PAPI that DSL leverages, to allow the >>> caller (DSL) to modify the record context (note that today for lower-level >>> API, the record context is always passed through when forwarding to the >>> next processor node) >>> 2) at the same time, consider optimizing the source KTable filter cases (I >>> think we already have certain JIRA tickets for this) so that the filter >>> operator is pushed early than the KTABLESOURCE node where materialization >>> happens. >>> 3) after 1) is done, come back to KIP-159 and add the proposed APIs. >>> >>> >>> Guozhang >>> >>> >>> On Thu, Dec 7, 2017 at 12:27 PM, Jan Filipiak <jan.filip...@trivago.com> >>> wrote: >>> >>>> Thank you Bill, >>>> >>>> I think this is reasonable. Do you have any suggestion >>>> for handling oldValues in cases like >>>> >>>> builder.table().filter(RichPredicate).join() >>>> >>>> where we process a Change with old and new value and dont have a record >>>> context for old. >>>> >>>> my suggestion would be that instead of >>>> >>>> SOURCE -> KTABLESOURCE -> KTABLEFILTER -> JOIN -> SINK >>>> >>>> we build >>>> >>>> SOURCE -> KTABLEFILTER -> KTABLESOURCE -> JOIN -> SINK >>>> >>>> We should build a topology like this from the beginning and not have >>>> an optimisation phase afterwards. >>>> >>>> Any opinions? >>>> >>>> Best Jan >>>> >>>> >>>> >>>> >>>> On 05.12.2017 17:34, Bill Bejeck wrote: >>>> >>>>> Matthias, >>>>> >>>>> Overall I agree with what you've presented here. >>>>> >>>>> Initially, I was hesitant to remove information from the context of the >>>>> result records (Joins or Aggregations) with the thought that when there >>>>> are >>>>> unexpected results, the source information would be useful for tracing >>>>> back >>>>> where the error could have occurred. But in the case of Joins and >>>>> Aggregations, the amount of data needed to do meaningful analysis could >>> be >>>>> too much. For example, a join result could come from two topics so you'd >>>>> need to keep both original topic names, offsets, etc. (plus the broker >>>>> could have deleted the records in the interim so even having offset >>> could >>>>> provide nothing). >>>>> >>>>> I'm bit long winded here, but I've come full circle to your original >>>>> proposal that since Joins and Aggregations produce fundamentally new >>>>> types, >>>>> we drop the corresponding information from the context even in the case >>> of >>>>> single topic aggregations. >>>>> >>>>> Thanks, >>>>> Bill >>>>> >>>>> On Mon, Dec 4, 2017 at 7:02 PM, Matthias J. Sax <matth...@confluent.io> >>>>> wrote: >>>>> >>>>> I agree with Guozhang that just exposing meta data at the source level >>>>>> might not provide too much value. Furthermore, for timestamps we do >>>>>> already have a well defined contract and we should exploit it: >>>>>> timestamps can always be provided in a meaningful way. >>>>>> >>>>>> Also, for simple operations like KStream-filter/map the contract is >>>>>> simple and we can just use it. Same for KTable-filter/map (for new >>>>>> values). >>>>>> >>>>>> For aggregations, join, and oldValue, I could just drop some >>> information >>>>>> and return `null`/-1, if the result records has no semantically >>>>>> meaningful meta data. >>>>>> >>>>>> For example, for aggregations, we could preserve the partition (as all >>>>>> agg-input-records have the same partition). For single input topic >>>>>> aggregation (what I guess is the most prominent case), we can also >>> carry >>>>>> over the topic name (would be a internal repartitioning topic name >>>>>> often). Offsets don't have any semantic interpretation IMHO and we >>> could >>>>>> return -1. >>>>>> >>>>>> For joins, we could keep the partition information. Topic and offset >>> are >>>>>> both unknown/invalid for the output record IMHO. >>>>>> >>>>>> For the oldValue case, we can keep partition and for single input topic >>>>>> case topic name. Timestamp might be -1 for now, but after we added >>>>>> timestamps to KTable (what we plan to do anyway), we can also return a >>>>>> valid timestamp. Offset would be -1 again (if we store offset in KTable >>>>>> too, we could provide all offset as well -- but I don't see too much >>>>>> value in doing this compared to the storage overhead this implies). >>>>>> >>>>>> >>>>>> WDYT? >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 11/29/17 4:14 AM, Jan Filipiak wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> thank you for the summary and thanks for acknowledging that I do have >>> a >>>>>>> point here. >>>>>>> >>>>>>> I don't like the second Idea at all. Hence I started of this >>> discussion. >>>>>>> >>>>>>> I am just disappointed, back then when we had the discussion about how >>>>>>> to refactor store overload >>>>>>> and IQ handling, I knew the path we are taking is wrong. Having >>> problems >>>>>>> implementing these kinda >>>>>>> features (wich are really simple) is just a symptom of messed up IQ >>>>>>> implementation. I wish really bad >>>>>>> I could have convinced you guys back then. To be honest with IQ we can >>>>>>> continue here >>>>>>> as we Materialize but would not send oldValue, but with join you're >>> out >>>>>>> of luck with current setup. >>>>>>> >>>>>>> I of course recommend to do not introduce any optimizations here. Id >>>>>>> recommend to go towards what >>>>>>> I recommended already back then. So i would't say we need to optimize >>>>>>> anything later we need to build >>>>>>> the topology better in the first place. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 28.11.2017 21:00, Guozhang Wang wrote: >>>>>>> >>>>>>>> Jan, >>>>>>>> >>>>>>>> Thanks for your input, I can understand now that the oldValue is also >>>>>>>> exposed in user customized `filter` function and hence want record >>>>>>>> context >>>>>>>> we should expose is a problem. And I think it does brings a good >>> point >>>>>>>> >>>>>>> to >>>>>> >>>>>>> consider for KIP-159. The discussions maybe a bit confusing to reader >>>>>>>> though, and hence I'd like to summarize the status quo and with a >>>>>>>> proposal: >>>>>>>> >>>>>>>> In today's Streams DSL, when a KTable is created either from a source >>>>>>>> topic, or from an stateful operator, we will materialize the KTable >>>>>>>> with a >>>>>>>> backing state store; on the other hand, KTables created from a >>>>>>>> non-stateful >>>>>>>> operator like filter, will not be backed by a state store by default >>>>>>>> unless >>>>>>>> users indicate so (e.g. using the overloaded function with the >>>>>>>> queryable >>>>>>>> name or store supplier). >>>>>>>> >>>>>>>> For example: >>>>>>>> >>>>>>>> KTable table1 = builder.table("topic"); >>>>>>>> >>>>>>> // a >>>>>> >>>>>>> state store created for table1 >>>>>>>> KTable table2 = table1.filter(..); >>>>>>>> // no state store created for table2 >>>>>>>> KTable table3 = table1.filter(.., "storeName"); // a >>>>>>>> state >>>>>>>> store created for table3 >>>>>>>> KTable table4 = table1.groupBy(..).aggregate(..); // a >>>>>>>> state >>>>>>>> store created for table4 >>>>>>>> >>>>>>>> Because of that, the filter() operator above on table1 will always be >>>>>>>> exposed with oldValue and newValue; Damian's point is that, we may >>>>>>>> optimize >>>>>>>> the first case such that table1 will only be materialized if users >>>>>>>> asked so >>>>>>>> (e.g. using the overloaded function with a store supplier), and in >>>>>>>> which >>>>>>>> case, we do not need to pass newValue / oldValue pairs (I think this >>> is >>>>>>>> what Jan suggests as well, i.e. do filtering before materializing, so >>>>>>>> that >>>>>>>> we can have a smaller backed state store as well). But this >>>>>>>> optimization >>>>>>>> does not eliminate the possibilities that we may still need to do >>>>>>>> filter if >>>>>>>> users does specify "yes I do want to the source KTable itself to be >>>>>>>> materialized, please". So the concern about how to expose the record >>>>>>>> context in such cases still persists. >>>>>>>> >>>>>>>> >>>>>>>> With that, regarding to KIP-159 itself, here are my thoughts: >>>>>>>> >>>>>>>> 1) if we restrict the scope of exposing record context only to source >>>>>>>> KTables / KStreams I felt the KIP itself does not bring much value >>>>>>>> given >>>>>>>> its required API change because only the SourceKStream can safely >>>>>>>> maintain >>>>>>>> its records context, and for SourceKTable if it is materialized, then >>>>>>>> even >>>>>>>> non-stateful operators like Join may still have a concern about >>>>>>>> exposing >>>>>>>> the record context. >>>>>>>> >>>>>>>> 2) an alternative idea is we provide the semantics on how record >>>>>>>> context >>>>>>>> would be inherited across the operators for KTable / KStream and >>>>>>>> expose it >>>>>>>> in all operators (similarly in PAPI we would expose a much simpler >>>>>>>> contract), and make it as a public contract that Streams library will >>>>>>>> guarantee moving forward even we optimize our topology builder; it >>> may >>>>>>>> not >>>>>>>> align perfectly with the linear algebraic semantics but practically >>>>>>>> applicable for most cases; if users semantics do not fit in the >>>>>>>> provided >>>>>>>> contract, then they may need to get this themselves (embed such >>>>>>>> information >>>>>>>> in the value payload, for example). >>>>>>>> >>>>>>>> If people do not like the second idea, I'd suggest we hold on >>> pursuing >>>>>>>> the >>>>>>>> first direction since to me its beneficial scope is too limited >>>>>>>> compared to >>>>>>>> its cost. >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Nov 24, 2017 at 1:39 AM, Jan Filipiak < >>>>>>>> jan.filip...@trivago.com >>>>>>>> wrote: >>>>>>>> >>>>>>>> Cleary we show the oldValue to the user. We have to, because we >>> filter >>>>>>>>> after the store. >>>>>>>>> https://github.com/axbaretto/kafka/blob/master/streams/src/m >>>>>>>>> ain/java/org/apache/kafka/streams/kstream/internals/ >>>>>>>>> >>>>>>>> KTableFilter.java#L96 >>>>>> >>>>>>> >>>>>>>>> I cannot help you following this. It is really obvious and I am >>>>>>>>> running >>>>>>>>> out of tools for explaining. >>>>>>>>> >>>>>>>>> Thanks for understanding my point to put filter before. Not only >>>>>>>>> would it >>>>>>>>> make the store smaller. It would make this feature reasonably >>>>>>>>> possible and >>>>>>>>> the framework easier. Interestingly it would also help to move IQ >>>>>>>>> into more >>>>>>>>> reasonable directions. And it might help understand that we do not >>>>>>>>> need any >>>>>>>>> intermediate representation of the topology, >>>>>>>>> >>>>>>>>> KIP-182 I have no clue what everyone has with their "bytestores" so >>>>>>>>> broken. But putting another store after doesn't help when the store >>>>>>>>> before >>>>>>>>> is the problem. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 24.11.2017 05:08, Matthias J. Sax wrote: >>>>>>>>> >>>>>>>>> From a DSL point of view, users only see the new value on a >>>>>>>>>> KTable#filter anyway. So why should it be an issue that we use >>>>>>>>>> <newValue,oldValue> pair under the hood? >>>>>>>>>> >>>>>>>>>> User sees newValue and gets corresponding RecordContext. I can't >>> see >>>>>>>>>> any >>>>>>>>>> issue here? >>>>>>>>>> >>>>>>>>>> I cannot follow here: >>>>>>>>>> >>>>>>>>>> Even when we have a statefull operation last. We move it to the >>> very >>>>>>>>>> >>>>>>>>>>> first processor (KtableSource) >>>>>>>>>>>> and therefore cant present a proper RecordContext. >>>>>>>>>>>> >>>>>>>>>>>> With regard to `builder.table().filter()`: >>>>>>>>>> >>>>>>>>>> I see you point that it would be good to be able to apply the >>>>>>>>>> filter() >>>>>>>>>> first to reduce the stat store size of the table. But how is this >>>>>>>>>> related to KIP-159? >>>>>>>>>> >>>>>>>>>> Btw: with KIP-182, I am wondering if this would not be possible, by >>>>>>>>>> putting a custom dummy store into the table and materialize the >>>>>>>>>> filter >>>>>>>>>> result afterwards? It's not a nice way to do, but seems to be >>>>>>>>>> >>>>>>>>> possible. >>>>>> >>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> On 11/23/17 4:56 AM, Jan Filipiak wrote: >>>>>>>>>> >>>>>>>>>> The comment is valid. It falls exactly into this topic, it has >>>>>>>>>>> >>>>>>>>>> exactly >>>>>> >>>>>>> todo with this! >>>>>>>>>>> Even when we have a statefull operation last. We move it to the >>> very >>>>>>>>>>> first processor (KtableSource) >>>>>>>>>>> and therefore cant present a proper RecordContext. >>>>>>>>>>> >>>>>>>>>>> Regarding the other Jiras you are referring to. They harm the >>>>>>>>>>> project >>>>>>>>>>> more than they do good! >>>>>>>>>>> There is no need for this kind of optimizer and meta >>> representation >>>>>>>>>>> and >>>>>>>>>>> what not. I hope they >>>>>>>>>>> never get implemented. >>>>>>>>>>> >>>>>>>>>>> Best Jan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 22.11.2017 14:44, Damian Guy wrote: >>>>>>>>>>> >>>>>>>>>>> Jan, i think you comment with respect to filtering is valid, >>> though >>>>>>>>>>>> not for >>>>>>>>>>>> this KIP. We have separate JIRAs for topology optimization of >>>>>>>>>>>> which this >>>>>>>>>>>> falls into. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Damian >>>>>>>>>>>> >>>>>>>>>>>> On Wed, 22 Nov 2017 at 02:25 Guozhang Wang <wangg...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Jan, >>>>>>>>>>>> >>>>>>>>>>>>> Not sure I understand your argument that "we still going to >>>>>>>>>>>>> present >>>>>>>>>>>>> change.oldValue to the filter even though the record context() >>> is >>>>>>>>>>>>> for >>>>>>>>>>>>> change.newValue". Are you referring to `KTableFilter#process()`? >>>>>>>>>>>>> If yes >>>>>>>>>>>>> could you point to me which LOC are you concerning about? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Guozhang >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak < >>>>>>>>>>>>> jan.filip...@trivago.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> a remark of mine that got missed during migration: >>>>>>>>>>>>> >>>>>>>>>>>>>> There is this problem that even though we have >>>>>>>>>>>>>> source.table.filter.join >>>>>>>>>>>>>> the state-fullness happens at the table step not a the join >>>>>>>>>>>>>> step. In a >>>>>>>>>>>>>> filter >>>>>>>>>>>>>> we still going to present change.oldValue to the filter even >>>>>>>>>>>>>> >>>>>>>>>>>>> though >>>>>> >>>>>>> the >>>>>>>>>>>>>> record context() is for change.newValue. I would go as far as >>>>>>>>>>>>>> applying >>>>>>>>>>>>>> the filter before the table processor. Not to just get KIP-159, >>>>>>>>>>>>>> >>>>>>>>>>>>> but >>>>>> >>>>>>> because >>>>>>>>>>>>> >>>>>>>>>>>>> I think its a side effect of a non ideal topology layout. If i >>> can >>>>>>>>>>>>>> filter >>>>>>>>>>>>>> 99% of my >>>>>>>>>>>>>> records. my state could be way smaller. Also widely escalates >>> the >>>>>>>>>>>>>> context >>>>>>>>>>>>>> of the KIP >>>>>>>>>>>>>> >>>>>>>>>>>>>> I can only see upsides of executing the filter first. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 20.11.2017 22:22, Matthias J. Sax wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am moving this back to the DISCUSS thread... Last 10 emails >>>>>>>>>>>>>> were >>>>>>>>>>>>>> >>>>>>>>>>>>>>> sent >>>>>>>>>>>>>>> to VOTE thread. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Copying Guozhang's last summary below. Thanks for this >>> summary. >>>>>>>>>>>>>>> Very >>>>>>>>>>>>>>> comprehensive! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It seems, we all agree, that the current implementation of the >>>>>>>>>>>>>>> context >>>>>>>>>>>>>>> at PAPI level is ok, but we should not leak it into DSL. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thus, we can go with (2) or (3), were (3) is an extension to >>> (2) >>>>>>>>>>>>>>> carrying the context to more operators than just sources. It >>>>>>>>>>>>>>> also >>>>>>>>>>>>>>> seems, >>>>>>>>>>>>>>> that we all agree, that many-to-one operations void the >>> context. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I still think, that just going with plain (2) is too >>>>>>>>>>>>>>> restrictive -- >>>>>>>>>>>>>>> but >>>>>>>>>>>>>>> I am also fine if we don't go with the full proposal of (3). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Also note, that the two operators filter() and filterNot() >>> don't >>>>>>>>>>>>>>> modify >>>>>>>>>>>>>>> the record and thus for both, it would be absolutely valid to >>>>>>>>>>>>>>> >>>>>>>>>>>>>> keep >>>>>> >>>>>>> the >>>>>>>>>>>>>>> context. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I personally would keep the context for at least all >>> one-to-one >>>>>>>>>>>>>>> operators. One-to-many is debatable and I am fine to not carry >>>>>>>>>>>>>>> >>>>>>>>>>>>>> the >>>>>> >>>>>>> context further: at least the offset information is >>>>>>>>>>>>>>> questionable for >>>>>>>>>>>>>>> this case -- note thought, that semantically, the timestamp is >>>>>>>>>>>>>>> inherited >>>>>>>>>>>>>>> via one-to-many, and I also think this applies to "topic" and >>>>>>>>>>>>>>> "partition". Thus, I think it's still valuable information we >>>>>>>>>>>>>>> can >>>>>>>>>>>>>>> carry >>>>>>>>>>>>>>> downstreams. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jan: which approach are you referring to as "the approach that >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>> on the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> table would be perfect"? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Note that in today's PAPI layer we are already effectively >>>>>>>>>>>>>>>> exposing the >>>>>>>>>>>>>>>> record context which has the issues that we have been >>>>>>>>>>>>>>>> discussing >>>>>>>>>>>>>>>> right >>>>>>>>>>>>>>>> now, >>>>>>>>>>>>>>>> and its semantics is always referring to the "processing >>>>>>>>>>>>>>>> record" at >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> hand. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> More specifically, we can think of processing a record a bit >>>>>>>>>>>>>> >>>>>>>>>>>>>>> different: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1) the record traversed the topology from source to sink, it >>>>>>>>>>>>>>>> may be >>>>>>>>>>>>>>>> transformed into new object or even generate multiple new >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> objects >>>>>> >>>>>>> (think: >>>>>>>>>>>>>>> >>>>>>>>>>>>>> branch) along the traversal. And the record context is >>> referring >>>>>>>>>>>>>> >>>>>>>>>>>>> to >>>>>> >>>>>>> this >>>>>>>>>>>>>>> >>>>>>>>>>>>>> processing record. Here the "lifetime" of the record lasts for >>>>>>>>>>>>>> the >>>>>>>>>>>>>> >>>>>>>>>>>>>>> entire >>>>>>>>>>>>>>> >>>>>>>>>>>>>> topology traversal and any new records of this traversal is >>>>>>>>>>>>>> >>>>>>>>>>>>>>> treated as >>>>>>>>>>>>>>>> different transformed values of this record (this applies to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> join >>>>>> >>>>>>> and >>>>>>>>>>>>>>>> aggregations as well). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2) the record being processed is wiped out in the first >>>>>>>>>>>>>>>> operator >>>>>>>>>>>>>>>> after >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> source, and NEW records are forwarded to downstream >>> operators. >>>>>>>>>>>>>>>> I.e. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> each >>>>>>>>>>>>>>> >>>>>>>>>>>>>> record only lives between two adjacent operators, once it >>>>>>>>>>>>>> reached the >>>>>>>>>>>>>> >>>>>>>>>>>>>>> new >>>>>>>>>>>>>>> >>>>>>>>>>>>>> operator it's lifetime has ended and new records are generated. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think in the past we have talked about Streams under both >>>>>>>>>>>>>>>> context, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>> >>>>>>>>>>>>>> we >>>>>>>>>>>>>> >>>>>>>>>>>>>>> do not have a clear agreement. I agree that 2) is logically >>> more >>>>>>>>>>>>>>>> understandable for users as it does not leak any internal >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>> >>>>>>>>>>>>>> details (e.g. for stream-table joins, table record's traversal >>>>>>>>>>>>>> >>>>>>>>>>>>>>> ends at >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> join operator as it is only be materialized, while stream >>>>>>>>>>>>>>>> record's >>>>>>>>>>>>>>>> traversal goes through the join operator to further down >>> until >>>>>>>>>>>>>>>> sinks). >>>>>>>>>>>>>>>> However if we are going to interpret following 2) above then >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> even >>>>>> >>>>>>> for >>>>>>>>>>>>>>>> non-stateful operators we would not inherit record context. >>>>>>>>>>>>>>>> What >>>>>>>>>>>>>>>> we're >>>>>>>>>>>>>>>> discussing now, seems to infer a third semantics: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 3) a record would traverse "through" one-to-one >>> (non-stateful) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> operators, >>>>>>>>>>>>>>> >>>>>>>>>>>>>> will "replicate" at one-to-many (non-stateful) operators >>> (think: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> "mapValues" >>>>>>>>>>>>>>>> ) and will "end" at many-to-one (stateful) operators >>>>>>>>>>>>>>>> where NEW >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> records >>>>>>>>>>>>>>> >>>>>>>>>>>>>> will be generated and forwarded to the downstream operators. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Just wanted to lay the ground for discussions so we are all on >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>> page before chatting more. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 11/6/17 1:41 PM, Jeyhun Karimov wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks a lot for correcting. It is a leftover from the past >>>>>>>>>>>>>>>> designs >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>> >>>>>>>>>>>>>> punctuate() was not deprecated. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I corrected. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Nov 6, 2017 at 5:30 PM Matthias J. Sax >>>>>>>>>>>>>>>> <matth...@confluent.io> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I just re-read the KIP. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> One minor comment: we don't need to introduce any deprecated >>>>>>>>>>>>>>>>> methods. >>>>>>>>>>>>>>>>> Thus, RichValueTransformer#punctuate can be removed >>> completely >>>>>>>>>>>>>>>>> instead >>>>>>>>>>>>>>>>> of introducing it as deprecated. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Otherwise looks good to me. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for being so patient! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 11/1/17 9:16 PM, Guozhang Wang wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Jeyhun, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I think I'm convinced to not do KAFKA-3907 in this KIP. We >>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> think >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> carefully if we should add this functionality to the DSL >>> layer >>>>>>>>>>>>>> >>>>>>>>>>>>>>> moving >>>>>>>>>>>>>>>>>> forward since from what we discovered working on it the >>>>>>>>>>>>>>>>>> conclusion is >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> it would require revamping the public APIs quite a lot, and >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> it's >>>>>> >>>>>>> not >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> clear >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> if it is a good trade-off than asking users to call >>> process() >>>>>>>>>>>>>>>>> instead. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Nov 1, 2017 at 4:50 AM, Damian Guy >>>>>>>>>>>>>>>>>> <damian....@gmail.com> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Jeyhun, thanks, looks good. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Do we need to remove the line that says: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> - on-demand commit() feature >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, 31 Oct 2017 at 23:07 Jeyhun Karimov < >>>>>>>>>>>>>>>>>>> je.kari...@gmail.com> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I removed the 'commit()' feature, as we discussed. It >>>>>>>>>>>>>>>>>>> simplified >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> overall design of KIP a lot. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> If it is ok, I would like to start a VOTE thread. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Oct 27, 2017 at 5:28 PM Matthias J. Sax < >>>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks. I understand what you are saying, but I don't >>>>>>>>>>>>>>>>>>>> agree that >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> but also we need a commit() method >>>>>>>>>>>>>>>>>>>>> I would just not provide `commit()` at DSL level and >>>>>>>>>>>>>>>>>>>>> close the >>>>>>>>>>>>>>>>>>>>> corresponding Jira as "not a problem" or similar. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On 10/27/17 3:42 PM, Jeyhun Karimov wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for your comments. I agree that this is not the >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> best >>>>>> >>>>>>> way >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> do. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> A >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> bit of history behind this design. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Prior doing this, I tried to provide ProcessorContext >>>>>>>>>>>>>>>>>>>>>> itself >>>>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> argument >>>>>>>>>>>>>> >>>>>>>>>>>>>>> in Rich interfaces. However, we dont want to give users >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> that >>>>>> >>>>>>> flexibility >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> and “power”. Moreover, ProcessorContext contains >>> processor >>>>>>>>>>>>>>>>>>>>> level >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> information and not Record level info. The only thing we >>>>>>>>>>>>>>>>>>>>>> need ij >>>>>>>>>>>>>>>>>>>>>> ProcessorContext is commit() method. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> So, as far as I understood, we need recor context >>>>>>>>>>>>>>>>>>>>>> (offset, >>>>>>>>>>>>>>>>>>>>>> timestamp >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> etc) but also we need a commit() method ( we dont want >>> to >>>>>>>>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> ProcessorContext as a parameter so users can use >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> ProcessorContext.commit() >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> ). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> As a result, I thought to “propagate” commit() call >>> from >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> RecordContext >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> ProcessorContext() . >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> If there is a misunderstanding in motvation/discussion >>> of >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> KIP/included >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> jiras please let me know. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Fri 27. Oct 2017 at 12:39, Matthias J. Sax < >>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I am personally still not convinced, that we should add >>>>>>>>>>>>>>>>>>>>> `commit()` >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>>> all. >>>>>>>>>>>>>>>>>>>>> @Guozhang: you created the original Jira. Can you >>>>>>>>>>>>>>>>>>>>> elaborate a >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> little >>>>>>>>>>>>>>>>>>>>>>> bit? Isn't requesting commits a low level API that >>>>>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>> not be >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> exposed >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> in the DSL? Just want to understand the motivation >>>>>>>>>>>>>>>>>>>>> better. Why >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> anybody that uses the DSL ever want to request a commit? >>> To >>>>>>>>>>>>>> >>>>>>>>>>>>>>> me, >>>>>>>>>>>>>>>>>>>>>>> requesting commits is useful if you manipulated state >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> explicitly, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> ie, >>>>>>>>>>>>>> >>>>>>>>>>>>>>> via Processor API. >>>>>>>>>>>>>>>>>>>>> Also, for the solution: it seem rather unnatural to me, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> that we >>>>>>>>>>>>>>>>>>>>>>> add >>>>>>>>>>>>>>>>>>>>>>> `commit()` to `RecordContext` -- from my >>> understanding, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> `RecordContext` >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> is an helper object that provide access to record meta >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> data. >>>>>> >>>>>>> Requesting >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> a commit is something quite different. Additionally, a >>>>>>>>>>>>>>>>>>>>> commit >>>>>>>>>>>>>>>>>>>>> does >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> commit a specific record but a `RecrodContext` is for a >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> specific >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> record. >>>>>>>>>>>>>>>>>>>>> To me, this does not seem to be a sound API design if we >>>>>>>>>>>>>>>>>>>>> follow >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> path. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On 10/26/17 10:41 PM, Jeyhun Karimov wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for your suggestions. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I have some comments, to make sure that there is no >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> misunderstanding. >>>>>>>>>>>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in >>>>>>>>>>>>>>>>>>>>>>>> ProcessorContext, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> enforce >>>>>>>>>>>>>> >>>>>>>>>>>>>>> user to consolidate this call as >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> internal >>>>>> >>>>>>> implementation >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` >>> is >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> changed >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> this call. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> - I think we should not deprecate >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()`. >>>>>>>>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> main >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> intuition that we introduce `commit()` in >>>>>>>>>>>>>>>>>>>>>> `RecordContext` is >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> that, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> `RecordContext` is the one which is provided in Rich >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> interfaces. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> So >>>>>>>>>>>>>> >>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>> wants to commit, then there should be some method >>> inside >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> `RecordContext` >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> do so. Internally, `RecordContext.commit()` calls >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` (see the last code >>>>>>>>>>>>>>>>>>>>>>>> snippet in >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> KIP-159): >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> public void process(final K1 key, final V1 >>>>>>>>>>>>>>>>>>>>> value) { >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> recordContext = new RecordContext() >>>>>>>>>>>>>>>>>>>>>>>> { // >>>>>>>>>>>>>>>>>>>>>>>> recordContext initialization is added in this KIP >>>>>>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>>>>>> public void commit() { >>>>>>>>>>>>>>>>>>>>>>>> context().commit(); >>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>>>>>> public long offset() { >>>>>>>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> context().recordContext().offs >>>>>> >>>>>>> et(); >>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>>>>>> public long timestamp() { >>>>>>>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>>>> context().recordContext().timestamp(); >>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>>>>>> public String topic() { >>>>>>>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> context().recordContext().topi >>>>>> >>>>>>> c(); >>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>>>>>> public int partition() { >>>>>>>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>>>> context().recordContext().partition(); >>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>> }; >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> So, we cannot deprecate `ProcessorContext.commit()` >>> in >>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> case >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> IMO. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2. Add the `task` reference to the impl class, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> that it can implement the commit call itself. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> - Actually, I don't think that we need `commit()` in >>>>>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`. The main intuition is to >>>>>>>>>>>>>>>>>>>>>>>> "transfer" >>>>>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` call to Rich interfaces, >>> to >>>>>>>>>>>>>>>>>>>>>>>> support >>>>>>>>>>>>>>>>>>>>>>>> user-specific committing. >>>>>>>>>>>>>>>>>>>>>>>> To do so, we introduce `commit()` method in >>>>>>>>>>>>>>>>>>>>>>>> `RecordContext()` >>>>>>>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> call ProcessorContext.commit() inside. (see the above >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> code >>>>>> >>>>>>> snippet) >>>>>>>>>>>>>>>>>>>>>>>> So, in Rich interfaces, we are not dealing with >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext` >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> at all, and we leave all its methods as it is. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In this KIP, we made `RecordContext` to be the parent >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> class of >>>>>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, just because of they share >>>>>>>>>>>>>>>>>>>>>>>> quite >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> amount >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>> >>>>>>>>>>>>>>> methods and it is logical to enable inheritance between >>>>>>>>>>>>>>>>>>>>>> those >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> two. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However, call >>> to a >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> commit() >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> method, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> is valid only within RecordContext interface (at least >>> for >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> now), >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>> >>>>>>>>>>>>>>> throw >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>>>>>>> snippet >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> below would need to be updated as well. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> - I think above explanation covers this as well. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I want to gain some speed to this KIP, as it has gone >>>>>>>>>>>>>>>>>>>>>>>> though >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> many >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> changes >>>>>>>>>>>>>> >>>>>>>>>>>>>>> based on user/developer needs, both in >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> documentation-/implementation-wise. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 24, 2017 at 1:41 AM Guozhang Wang < >>>>>>>>>>>>>>>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks for the information Jeyhun. I had also forgot >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> about >>>>>> >>>>>>> KAFKA-3907 >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> this KIP.. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thinking a bit more, I'm now inclined to go with what >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> we >>>>>> >>>>>>> agreed >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> before, >>>>>>>>>>>>>> >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> add the commit() call to `RecordContext`. A few minor >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> tweaks on >>>>>>>>>>>>>>>>>>>>>>>> its >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> implementation: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 1. Maybe we can deprecate the `commit()` in >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> ProcessorContext, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> enforce >>>>>>>>>>>>>> >>>>>>>>>>>>>>> user to consolidate this call as >>>>>>>>>>>>>>>>>>>>>>> "processorContext.recordContext().commit()". And >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> internal >>>>>> >>>>>>> implementation >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>> `ProcessorContext.commit()` in `ProcessorContextImpl` >>> is >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> changed >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> this call. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 2. Add the `task` reference to the impl class, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> `ProcessorRecordContext`, so >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> that it can implement the commit call itself. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 3. In the wiki page, the statement that "However, >>>>>>>>>>>>>>>>>>>>>>>>> call to a >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> commit() >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> method, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> is valid only within RecordContext interface (at least >>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> now), >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>> >>>>>>>>>>>>>>> throw >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> an exception in ProcessorRecordContext.commit()." and >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> code >>>>>>>>>>>>>>>>>>>>>>>> snippet >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> below would need to be updated as well. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 23, 2017 at 1:40 PM, Matthias J. Sax < >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> Fair point. This is a long discussion and I totally >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> forgot >>>>>> >>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>> >>>>>>>>>>>>>>> discussed this. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Seems I changed my opinion about including >>> KAFKA-3907... >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Happy to hear what others think. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On 10/23/17 1:20 PM, Jeyhun Karimov wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> It is probably my bad, the discussion was a bit >>>>>>>>>>>>>>>>>>>>>>>>>>> long in >>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> thread. I >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> proposed the related issue in the related KIP >>> discuss >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> thread >>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> got >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> approval [2,3]. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Maybe I misunderstood. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kaf >>>>>>>>>>>>>>>>>>>>>>>>>>> ka/uyzND19Asmg1GKKXT1?subj= >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+ >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Streams >>>>>> >>>>>>> [2] >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/Kaf >>>>>>>>>>>>>>>>>>>>>>>>>>> ka/uyzND1kpct22GKKXT1?subj= >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+ >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Streams >>>>>> >>>>>>> [3] >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> http://search-hadoop.com/m/ >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Kafka/uyzND1G6TGIGKKXT1?subj= >>>>>> >>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Re+DISCUSS+KIP+159+Introducing+Rich+functions+to+ >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Streams >>>>> >>>>> >>> >>> >>> -- >>> -- Guozhang >>> >> >
signature.asc
Description: OpenPGP digital signature