@Jeyhun, I understand that the discussion about KIP-159 is dragging long, so while we are moving on discussion for whether / hows of KIP-159, maybe you can start implementing the non overlapping part of the APIs of KIP-149 to get you unblocked?
Guozhang On Sun, Nov 19, 2017 at 12:12 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Jan: which approach are you referring to as "the approach that is on the > table would be perfect"? > > Note that in today's PAPI layer we are already effectively exposing the > record context which has the issues that we have been discussing right now, > and its semantics is always referring to the "processing record" at hand. > More specifically, we can think of processing a record a bit different: > > 1) the record traversed the topology from source to sink, it may be > transformed into new object or even generate multiple new objects (think: > branch) along the traversal. And the record context is referring to this > processing record. Here the "lifetime" of the record lasts for the entire > topology traversal and any new records of this traversal is treated as > different transformed values of this record (this applies to join and > aggregations as well). > > 2) the record being processed is wiped out in the first operator after the > source, and NEW records are forwarded to downstream operators. I.e. each > record only lives between two adjacent operators, once it reached the new > operator it's lifetime has ended and new records are generated. > > I think in the past we have talked about Streams under both context, and > we do not have a clear agreement. I agree that 2) is logically more > understandable for users as it does not leak any internal implementation > details (e.g. for stream-table joins, table record's traversal ends at the > join operator as it is only be materialized, while stream record's > traversal goes through the join operator to further down until sinks). > However if we are going to interpret following 2) above then even for > non-stateful operators we would not inherit record context. What we're > discussing now, seems to infer a third semantics: > > 3) a record would traverse "through" one-to-one (non-stateful) operators, > will "replicate" at one-to-many (non-stateful) operators (think: > "mapValues" ) and will "end" at many-to-one (stateful) operators where > NEW records will be generated and forwarded to the downstream operators. > > Just wanted to lay the ground for discussions so we are all on the same > page before chatting more. > > > Guozhang > > > On Sat, Nov 18, 2017 at 3:10 AM, Jan Filipiak <jan.filip...@trivago.com> > wrote: > >> Hi, >> >> not an issue at all. IMO >> the approach that is on the table would be perfect >> >> >> On 18.11.2017 10:58, Jeyhun Karimov wrote: >> >>> Hi, >>> >>> I did not expected that Context will be this much an issue. Instead of >>> applying different semantics for different operators, I think we should >>> remove this feature completely. >>> >>> >>> Cheers, >>> Jeyhun >>> On Sat 18. Nov 2017 at 07:49, Jan Filipiak <jan.filip...@trivago.com> >>> wrote: >>> >>> Yes, the mail said only join so I wanted to clarify. >>>> >>>> >>>> >>>> On 17.11.2017 19:05, Matthias J. Sax wrote: >>>> >>>>> Yes. But I think an aggregation is an many-to-one operation, too. >>>>> >>>>> For the stripping off part: internally, we can just keep some record >>>>> context, but just do not allow users to access it (because the context >>>>> context does not make sense for them) by hiding the corresponding APIs. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 11/16/17 10:05 PM, Guozhang Wang wrote: >>>>> >>>>>> Matthias, >>>>>> >>>>>> For this idea, are your proposing that for any many-to-one mapping >>>>>> operations (for now only Join operators), we will strip off the record >>>>>> context in the resulted records and claim "we cannot infer its traced >>>>>> context anymore"? >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> >>>>>> On Thu, Nov 16, 2017 at 1:03 PM, Matthias J. Sax < >>>>>> matth...@confluent.io >>>>>> wrote: >>>>>> >>>>>> Any thoughts about my latest proposal? >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 11/10/17 10:02 PM, Jan Filipiak wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> i think this is the better way. Naming is always tricky Source is >>>>>>>> >>>>>>> kinda >>>> >>>>> taken >>>>>>>> I had TopicBackedK[Source|Table] in mind >>>>>>>> but for the user its way better already IMHO >>>>>>>> >>>>>>>> Thank you for reconsideration >>>>>>>> >>>>>>>> Best Jan >>>>>>>> >>>>>>>> >>>>>>>> On 10.11.2017 22:48, Matthias J. Sax wrote: >>>>>>>> >>>>>>>>> I was thinking about the source stream/table idea once more and it >>>>>>>>> >>>>>>>> seems >>>> >>>>> it would not be too hard to implement: >>>>>>>>> >>>>>>>>> We add two new classes >>>>>>>>> >>>>>>>>> SourceKStream extends KStream >>>>>>>>> >>>>>>>>> and >>>>>>>>> >>>>>>>>> SourceKTable extend KTable >>>>>>>>> >>>>>>>>> and return both from StreamsBuilder#stream and StreamsBuilder#table >>>>>>>>> >>>>>>>>> As both are sub-classes, this change is backward compatible. We >>>>>>>>> >>>>>>>> change >>>> >>>>> the return type for any single-record transform to this new types, >>>>>>>>> >>>>>>>> too, >>>> >>>>> and use KStream/KTable as return type for any multi-record operation. >>>>>>>>> >>>>>>>>> The new RecordContext API is added to both new classes. For old >>>>>>>>> >>>>>>>> classes, >>>> >>>>> we only implement KIP-149 to get access to the key. >>>>>>>>> >>>>>>>>> >>>>>>>>> WDYT? >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> On 11/9/17 9:13 PM, Jan Filipiak wrote: >>>>>>>>> >>>>>>>>>> Okay, >>>>>>>>>> >>>>>>>>>> looks like it would _at least work_ for Cached KTableSources . >>>>>>>>>> But we make it harder to the user to make mistakes by putting >>>>>>>>>> features into places where they don't make sense and don't >>>>>>>>>> help anyone. >>>>>>>>>> >>>>>>>>>> I once again think that my suggestion is easier to implement and >>>>>>>>>> more correct. I will use this email to express my disagreement >>>>>>>>>> with >>>>>>>>>> >>>>>>>>> the >>>> >>>>> proposed KIP (-1 non binding of course) state that I am open for any >>>>>>>>>> questions >>>>>>>>>> regarding this. I will also do the usual thing and point out that >>>>>>>>>> >>>>>>>>> the >>>> >>>>> friends >>>>>>>>>> over at Hive got it correct aswell. >>>>>>>>>> One can not user their >>>>>>>>>> https://cwiki.apache.org/confluence/display/Hive/ >>>>>>>>>> >>>>>>>>> LanguageManual+VirtualColumns >>>>>>> >>>>>>>> in any place where its not read from the Sources. >>>>>>>>>> >>>>>>>>>> With KSQl in mind it makes me sad how this is evolving here. >>>>>>>>>> >>>>>>>>>> Best Jan >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 10.11.2017 01:06, Guozhang Wang wrote: >>>>>>>>>> >>>>>>>>>>> Hello Jan, >>>>>>>>>>> >>>>>>>>>>> Regarding your question about caching: today we keep the record >>>>>>>>>>> >>>>>>>>>> context >>>>>>> >>>>>>>> with the cached entry already so when we flush the cache which may >>>>>>>>>>> generate >>>>>>>>>>> new records forwarding we will set the record context >>>>>>>>>>> >>>>>>>>>> appropriately; >>>> >>>>> and >>>>>>>>>>> then after the flush is completed we will reset the context to >>>>>>>>>>> the >>>>>>>>>>> record >>>>>>>>>>> before the flush happens. But I think when Jeyhun did the PR it >>>>>>>>>>> is >>>>>>>>>>> >>>>>>>>>> a >>>> >>>>> good >>>>>>>>>>> time to double check on such stages to make sure we are not >>>>>>>>>>> introducing any >>>>>>>>>>> regressions. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Nov 6, 2017 at 8:54 PM, Jan Filipiak < >>>>>>>>>>> >>>>>>>>>> jan.filip...@trivago.com> >>>>>>> >>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> I Aggree completely. >>>>>>>>>>>> >>>>>>>>>>>> Exposing this information in a place where it has no _natural_ >>>>>>>>>>>> belonging >>>>>>>>>>>> might really be a bad blocker in the long run. >>>>>>>>>>>> >>>>>>>>>>>> Concerning your first point. I would argue its not to hard to >>>>>>>>>>>> >>>>>>>>>>> have a >>>> >>>>> user >>>>>>>>>>>> keep track of these. If we still don't want the user >>>>>>>>>>>> to keep track of these I would argue that all > projection only >>>>>>>>>>>> < >>>>>>>>>>>> transformations on a Source-backed KTable/KStream >>>>>>>>>>>> could also return a Ktable/KStream instance of the type we >>>>>>>>>>>> return >>>>>>>>>>>> from the >>>>>>>>>>>> topology builder. >>>>>>>>>>>> Only after any operation that exceeds projection or filter one >>>>>>>>>>>> >>>>>>>>>>> would >>>> >>>>> return a KTable not granting access to this any longer. >>>>>>>>>>>> >>>>>>>>>>>> Even then its difficult already: I never ran a topology with >>>>>>>>>>>> >>>>>>>>>>> caching >>>> >>>>> but I >>>>>>>>>>>> am not even 100% sure what the record Context means behind >>>>>>>>>>>> a materialized KTable with Caching? Topic and Partition are >>>>>>>>>>>> >>>>>>>>>>> probably >>>> >>>>> with >>>>>>>>>>>> some reasoning but offset is probably only the offset causing >>>>>>>>>>>> the >>>>>>>>>>>> flush? >>>>>>>>>>>> So one might aswell think to drop offsets from this >>>>>>>>>>>> RecordContext. >>>>>>>>>>>> >>>>>>>>>>>> Best Jan >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 07.11.2017 03:18, Guozhang Wang wrote: >>>>>>>>>>>> >>>>>>>>>>>> Regarding the API design (the proposed set of overloads v.s. one >>>>>>>>>>>>> overload >>>>>>>>>>>>> on #map to enrich the record), I think what we have represents >>>>>>>>>>>>> a >>>>>>>>>>>>> >>>>>>>>>>>> good >>>>>>> >>>>>>>> trade-off between API succinctness and user convenience: on one >>>>>>>>>>>>> hand we >>>>>>>>>>>>> definitely want to keep as fewer overloaded functions as >>>>>>>>>>>>> >>>>>>>>>>>> possible. >>>> >>>>> But on >>>>>>>>>>>>> the other hand if we only do that in, say, the #map() function >>>>>>>>>>>>> >>>>>>>>>>>> then >>>> >>>>> this >>>>>>>>>>>>> enrichment could be an overkill: think of a topology that has 7 >>>>>>>>>>>>> operators >>>>>>>>>>>>> in a chain, where users want to access the record context on >>>>>>>>>>>>> operator #2 >>>>>>>>>>>>> and #6 only, with the "enrichment" manner they need to do the >>>>>>>>>>>>> enrichment >>>>>>>>>>>>> on >>>>>>>>>>>>> operator #2 and keep it that way until #6. In addition, the >>>>>>>>>>>>> RecordContext >>>>>>>>>>>>> fields (topic, offset, etc) are really orthogonal to the >>>>>>>>>>>>> >>>>>>>>>>>> key-value >>>> >>>>> payloads >>>>>>>>>>>>> themselves, so I think separating them into this object is a >>>>>>>>>>>>> >>>>>>>>>>>> cleaner >>>> >>>>> way. >>>>>>>>>>>>> >>>>>>>>>>>>> Regarding the RecordContext inheritance, this is actually a >>>>>>>>>>>>> good >>>>>>>>>>>>> point >>>>>>>>>>>>> that >>>>>>>>>>>>> have not been discussed thoroughly before. Here are my my two >>>>>>>>>>>>> >>>>>>>>>>>> cents: >>>> >>>>> one >>>>>>>>>>>>> natural way would be to inherit the record context from the >>>>>>>>>>>>> "triggering" >>>>>>>>>>>>> record, for example in a join operator, if the record from >>>>>>>>>>>>> >>>>>>>>>>>> stream A >>>> >>>>> triggers the join then the record context is inherited from with >>>>>>>>>>>>> >>>>>>>>>>>> that >>>>>>> >>>>>>>> record. This is also aligned with the lower-level PAPI >>>>>>>>>>>>> >>>>>>>>>>>> interface. A >>>> >>>>> counter >>>>>>>>>>>>> argument, though, would be that this is sort of leaking the >>>>>>>>>>>>> >>>>>>>>>>>> internal >>>> >>>>> implementations of the DSL, so that moving forward if we did some >>>>>>>>>>>>> refactoring to our join implementations so that the triggering >>>>>>>>>>>>> record can >>>>>>>>>>>>> change, the RecordContext would also be different. I do not >>>>>>>>>>>>> know >>>>>>>>>>>>> >>>>>>>>>>>> how >>>> >>>>> much >>>>>>>>>>>>> it would really affect end users, but would like to hear your >>>>>>>>>>>>> opinions. >>>>>>>>>>>>> >>>>>>>>>>>>> Agreed to 100% exposing this information >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Guozhang >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Nov 6, 2017 at 1:00 PM, Jeyhun Karimov < >>>>>>>>>>>>> >>>>>>>>>>>> je.kari...@gmail.com> >>>>>>> >>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Jan, >>>>>>>>>>>>> >>>>>>>>>>>>>> Sorry for late reply. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> The API Design doesn't look appealing >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> In terms of API design we tried to preserve the java >>>>>>>>>>>>>> functional >>>>>>>>>>>>>> interfaces. >>>>>>>>>>>>>> We applied the same set of rich methods for KTable to make it >>>>>>>>>>>>>> compatible >>>>>>>>>>>>>> with the rest of overloaded APIs. >>>>>>>>>>>>>> >>>>>>>>>>>>>> It should be 100% sufficient to offer a KTable + KStream that >>>>>>>>>>>>>> is >>>>>>>>>>>>>> directly >>>>>>>>>>>>>> >>>>>>>>>>>>>> feed from a topic with 1 additional overload for the #map() >>>>>>>>>>>>>>> methods to >>>>>>>>>>>>>>> cover every usecase while keeping the API in a way better >>>>>>>>>>>>>>> >>>>>>>>>>>>>> state. >>>> >>>>> - IMO this seems a workaround, rather than a direct solution. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Perhaps we should continue this discussion in DISCUSS thread. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Nov 6, 2017 at 9:14 PM Jan Filipiak >>>>>>>>>>>>>> <jan.filip...@trivago.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I do understand that it might come in Handy. >>>>>>>>>>>>>>> From my POV in any relational algebra this is only a >>>>>>>>>>>>>>> projection. >>>>>>>>>>>>>>> Currently we hide these "fields" that come with the input >>>>>>>>>>>>>>> >>>>>>>>>>>>>> record. >>>> >>>>> It should be 100% sufficient to offer a KTable + KStream that >>>>>>>>>>>>>>> >>>>>>>>>>>>>> is >>>> >>>>> directly >>>>>>>>>>>>>>> feed from a topic with 1 additional overload for the #map() >>>>>>>>>>>>>>> methods to >>>>>>>>>>>>>>> cover every usecase while keeping the API in a way better >>>>>>>>>>>>>>> >>>>>>>>>>>>>> state. >>>> >>>>> best Jan >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 06.11.2017 17:52, Matthias J. Sax wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jan, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I understand what you are saying. However, having a >>>>>>>>>>>>>>>> RecordContext is >>>>>>>>>>>>>>>> super useful for operations that are applied to input topic. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Many >>>> >>>>> users >>>>>>>>>>>>>>>> requested this feature -- it's much more convenient that >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> falling >>>> >>>>> back >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> transform() to implement a a filter() for example that want >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> access >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> some meta data. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Because we cannot distinguish different "origins" of a >>>>>>>>>>>>>>>> KStream/KTable, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>> am not sure if there would be a better way to do this. The >>>>>>>>>>>>>>> only >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> "workaround" I see, is to have two KStream/KTable interfaces >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> each >>>> >>>>> and >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>> would use the first one for KStream/KTable with "proper" >>>>>>>>>>>>>>> RecordContext. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> But this does not seem to be a good solution either. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Note, a KTable can also be read directly from a topic, I >>>>>>>>>>>>>>>> agree >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>> using RecordContext on a KTable that is the result of an >>>>>>>>>>>>>>>> aggregation is >>>>>>>>>>>>>>>> questionable. But I don't see a reason to down vote the KIP >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> for >>>> >>>>> this >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> reason. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> WDYT about this? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 11/1/17 10:19 PM, Jan Filipiak wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -1 non binding >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I don't get the motivation. >>>>>>>>>>>>>>>>> In 80% of my DSL processors there is no such thing as a >>>>>>>>>>>>>>>>> reasonable >>>>>>>>>>>>>>>>> RecordContext. >>>>>>>>>>>>>>>>> After a join the record I am processing belongs to at >>>>>>>>>>>>>>>>> least >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2 >>>> >>>>> topics. >>>>>>>>>>>>>>>>> After a Group by the record I am processing was created >>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>> offsets. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The API Design doesn't look appealing >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 01.11.2017 22:02, Jeyhun Karimov wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Dear community, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It seems the discussion for KIP-159 [1] converged >>>>>>>>>>>>>>>>>> finally. I >>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>> like to >>>>>>>>>>>>>>>>>> initiate voting for the particular KIP. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>> >> > > > -- > -- Guozhang > -- -- Guozhang