Jan: which approach are you referring to as "the approach that is on the table would be perfect"?
Note that in today's PAPI layer we are already effectively exposing the record context which has the issues that we have been discussing right now, and its semantics is always referring to the "processing record" at hand. More specifically, we can think of processing a record a bit different: 1) the record traversed the topology from source to sink, it may be transformed into new object or even generate multiple new objects (think: branch) along the traversal. And the record context is referring to this processing record. Here the "lifetime" of the record lasts for the entire topology traversal and any new records of this traversal is treated as different transformed values of this record (this applies to join and aggregations as well). 2) the record being processed is wiped out in the first operator after the source, and NEW records are forwarded to downstream operators. I.e. each record only lives between two adjacent operators, once it reached the new operator it's lifetime has ended and new records are generated. I think in the past we have talked about Streams under both context, and we do not have a clear agreement. I agree that 2) is logically more understandable for users as it does not leak any internal implementation details (e.g. for stream-table joins, table record's traversal ends at the join operator as it is only be materialized, while stream record's traversal goes through the join operator to further down until sinks). However if we are going to interpret following 2) above then even for non-stateful operators we would not inherit record context. What we're discussing now, seems to infer a third semantics: 3) a record would traverse "through" one-to-one (non-stateful) operators, will "replicate" at one-to-many (non-stateful) operators (think: "mapValues" ) and will "end" at many-to-one (stateful) operators where NEW records will be generated and forwarded to the downstream operators. Just wanted to lay the ground for discussions so we are all on the same page before chatting more. Guozhang On Sat, Nov 18, 2017 at 3:10 AM, Jan Filipiak <jan.filip...@trivago.com> wrote: > Hi, > > not an issue at all. IMO > the approach that is on the table would be perfect > > > On 18.11.2017 10:58, Jeyhun Karimov wrote: > >> Hi, >> >> I did not expected that Context will be this much an issue. Instead of >> applying different semantics for different operators, I think we should >> remove this feature completely. >> >> >> Cheers, >> Jeyhun >> On Sat 18. Nov 2017 at 07:49, Jan Filipiak <jan.filip...@trivago.com> >> wrote: >> >> Yes, the mail said only join so I wanted to clarify. >>> >>> >>> >>> On 17.11.2017 19:05, Matthias J. Sax wrote: >>> >>>> Yes. But I think an aggregation is an many-to-one operation, too. >>>> >>>> For the stripping off part: internally, we can just keep some record >>>> context, but just do not allow users to access it (because the context >>>> context does not make sense for them) by hiding the corresponding APIs. >>>> >>>> >>>> -Matthias >>>> >>>> On 11/16/17 10:05 PM, Guozhang Wang wrote: >>>> >>>>> Matthias, >>>>> >>>>> For this idea, are your proposing that for any many-to-one mapping >>>>> operations (for now only Join operators), we will strip off the record >>>>> context in the resulted records and claim "we cannot infer its traced >>>>> context anymore"? >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Thu, Nov 16, 2017 at 1:03 PM, Matthias J. Sax < >>>>> matth...@confluent.io >>>>> wrote: >>>>> >>>>> Any thoughts about my latest proposal? >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 11/10/17 10:02 PM, Jan Filipiak wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> i think this is the better way. Naming is always tricky Source is >>>>>>> >>>>>> kinda >>> >>>> taken >>>>>>> I had TopicBackedK[Source|Table] in mind >>>>>>> but for the user its way better already IMHO >>>>>>> >>>>>>> Thank you for reconsideration >>>>>>> >>>>>>> Best Jan >>>>>>> >>>>>>> >>>>>>> On 10.11.2017 22:48, Matthias J. Sax wrote: >>>>>>> >>>>>>>> I was thinking about the source stream/table idea once more and it >>>>>>>> >>>>>>> seems >>> >>>> it would not be too hard to implement: >>>>>>>> >>>>>>>> We add two new classes >>>>>>>> >>>>>>>> SourceKStream extends KStream >>>>>>>> >>>>>>>> and >>>>>>>> >>>>>>>> SourceKTable extend KTable >>>>>>>> >>>>>>>> and return both from StreamsBuilder#stream and StreamsBuilder#table >>>>>>>> >>>>>>>> As both are sub-classes, this change is backward compatible. We >>>>>>>> >>>>>>> change >>> >>>> the return type for any single-record transform to this new types, >>>>>>>> >>>>>>> too, >>> >>>> and use KStream/KTable as return type for any multi-record operation. >>>>>>>> >>>>>>>> The new RecordContext API is added to both new classes. For old >>>>>>>> >>>>>>> classes, >>> >>>> we only implement KIP-149 to get access to the key. >>>>>>>> >>>>>>>> >>>>>>>> WDYT? >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 11/9/17 9:13 PM, Jan Filipiak wrote: >>>>>>>> >>>>>>>>> Okay, >>>>>>>>> >>>>>>>>> looks like it would _at least work_ for Cached KTableSources . >>>>>>>>> But we make it harder to the user to make mistakes by putting >>>>>>>>> features into places where they don't make sense and don't >>>>>>>>> help anyone. >>>>>>>>> >>>>>>>>> I once again think that my suggestion is easier to implement and >>>>>>>>> more correct. I will use this email to express my disagreement with >>>>>>>>> >>>>>>>> the >>> >>>> proposed KIP (-1 non binding of course) state that I am open for any >>>>>>>>> questions >>>>>>>>> regarding this. I will also do the usual thing and point out that >>>>>>>>> >>>>>>>> the >>> >>>> friends >>>>>>>>> over at Hive got it correct aswell. >>>>>>>>> One can not user their >>>>>>>>> https://cwiki.apache.org/confluence/display/Hive/ >>>>>>>>> >>>>>>>> LanguageManual+VirtualColumns >>>>>> >>>>>>> in any place where its not read from the Sources. >>>>>>>>> >>>>>>>>> With KSQl in mind it makes me sad how this is evolving here. >>>>>>>>> >>>>>>>>> Best Jan >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 10.11.2017 01:06, Guozhang Wang wrote: >>>>>>>>> >>>>>>>>>> Hello Jan, >>>>>>>>>> >>>>>>>>>> Regarding your question about caching: today we keep the record >>>>>>>>>> >>>>>>>>> context >>>>>> >>>>>>> with the cached entry already so when we flush the cache which may >>>>>>>>>> generate >>>>>>>>>> new records forwarding we will set the record context >>>>>>>>>> >>>>>>>>> appropriately; >>> >>>> and >>>>>>>>>> then after the flush is completed we will reset the context to the >>>>>>>>>> record >>>>>>>>>> before the flush happens. But I think when Jeyhun did the PR it is >>>>>>>>>> >>>>>>>>> a >>> >>>> good >>>>>>>>>> time to double check on such stages to make sure we are not >>>>>>>>>> introducing any >>>>>>>>>> regressions. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Nov 6, 2017 at 8:54 PM, Jan Filipiak < >>>>>>>>>> >>>>>>>>> jan.filip...@trivago.com> >>>>>> >>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> I Aggree completely. >>>>>>>>>>> >>>>>>>>>>> Exposing this information in a place where it has no _natural_ >>>>>>>>>>> belonging >>>>>>>>>>> might really be a bad blocker in the long run. >>>>>>>>>>> >>>>>>>>>>> Concerning your first point. I would argue its not to hard to >>>>>>>>>>> >>>>>>>>>> have a >>> >>>> user >>>>>>>>>>> keep track of these. If we still don't want the user >>>>>>>>>>> to keep track of these I would argue that all > projection only < >>>>>>>>>>> transformations on a Source-backed KTable/KStream >>>>>>>>>>> could also return a Ktable/KStream instance of the type we return >>>>>>>>>>> from the >>>>>>>>>>> topology builder. >>>>>>>>>>> Only after any operation that exceeds projection or filter one >>>>>>>>>>> >>>>>>>>>> would >>> >>>> return a KTable not granting access to this any longer. >>>>>>>>>>> >>>>>>>>>>> Even then its difficult already: I never ran a topology with >>>>>>>>>>> >>>>>>>>>> caching >>> >>>> but I >>>>>>>>>>> am not even 100% sure what the record Context means behind >>>>>>>>>>> a materialized KTable with Caching? Topic and Partition are >>>>>>>>>>> >>>>>>>>>> probably >>> >>>> with >>>>>>>>>>> some reasoning but offset is probably only the offset causing the >>>>>>>>>>> flush? >>>>>>>>>>> So one might aswell think to drop offsets from this >>>>>>>>>>> RecordContext. >>>>>>>>>>> >>>>>>>>>>> Best Jan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 07.11.2017 03:18, Guozhang Wang wrote: >>>>>>>>>>> >>>>>>>>>>> Regarding the API design (the proposed set of overloads v.s. one >>>>>>>>>>>> overload >>>>>>>>>>>> on #map to enrich the record), I think what we have represents a >>>>>>>>>>>> >>>>>>>>>>> good >>>>>> >>>>>>> trade-off between API succinctness and user convenience: on one >>>>>>>>>>>> hand we >>>>>>>>>>>> definitely want to keep as fewer overloaded functions as >>>>>>>>>>>> >>>>>>>>>>> possible. >>> >>>> But on >>>>>>>>>>>> the other hand if we only do that in, say, the #map() function >>>>>>>>>>>> >>>>>>>>>>> then >>> >>>> this >>>>>>>>>>>> enrichment could be an overkill: think of a topology that has 7 >>>>>>>>>>>> operators >>>>>>>>>>>> in a chain, where users want to access the record context on >>>>>>>>>>>> operator #2 >>>>>>>>>>>> and #6 only, with the "enrichment" manner they need to do the >>>>>>>>>>>> enrichment >>>>>>>>>>>> on >>>>>>>>>>>> operator #2 and keep it that way until #6. In addition, the >>>>>>>>>>>> RecordContext >>>>>>>>>>>> fields (topic, offset, etc) are really orthogonal to the >>>>>>>>>>>> >>>>>>>>>>> key-value >>> >>>> payloads >>>>>>>>>>>> themselves, so I think separating them into this object is a >>>>>>>>>>>> >>>>>>>>>>> cleaner >>> >>>> way. >>>>>>>>>>>> >>>>>>>>>>>> Regarding the RecordContext inheritance, this is actually a good >>>>>>>>>>>> point >>>>>>>>>>>> that >>>>>>>>>>>> have not been discussed thoroughly before. Here are my my two >>>>>>>>>>>> >>>>>>>>>>> cents: >>> >>>> one >>>>>>>>>>>> natural way would be to inherit the record context from the >>>>>>>>>>>> "triggering" >>>>>>>>>>>> record, for example in a join operator, if the record from >>>>>>>>>>>> >>>>>>>>>>> stream A >>> >>>> triggers the join then the record context is inherited from with >>>>>>>>>>>> >>>>>>>>>>> that >>>>>> >>>>>>> record. This is also aligned with the lower-level PAPI >>>>>>>>>>>> >>>>>>>>>>> interface. A >>> >>>> counter >>>>>>>>>>>> argument, though, would be that this is sort of leaking the >>>>>>>>>>>> >>>>>>>>>>> internal >>> >>>> implementations of the DSL, so that moving forward if we did some >>>>>>>>>>>> refactoring to our join implementations so that the triggering >>>>>>>>>>>> record can >>>>>>>>>>>> change, the RecordContext would also be different. I do not know >>>>>>>>>>>> >>>>>>>>>>> how >>> >>>> much >>>>>>>>>>>> it would really affect end users, but would like to hear your >>>>>>>>>>>> opinions. >>>>>>>>>>>> >>>>>>>>>>>> Agreed to 100% exposing this information >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Nov 6, 2017 at 1:00 PM, Jeyhun Karimov < >>>>>>>>>>>> >>>>>>>>>>> je.kari...@gmail.com> >>>>>> >>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi Jan, >>>>>>>>>>>> >>>>>>>>>>>>> Sorry for late reply. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> The API Design doesn't look appealing >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> In terms of API design we tried to preserve the java functional >>>>>>>>>>>>> interfaces. >>>>>>>>>>>>> We applied the same set of rich methods for KTable to make it >>>>>>>>>>>>> compatible >>>>>>>>>>>>> with the rest of overloaded APIs. >>>>>>>>>>>>> >>>>>>>>>>>>> It should be 100% sufficient to offer a KTable + KStream that >>>>>>>>>>>>> is >>>>>>>>>>>>> directly >>>>>>>>>>>>> >>>>>>>>>>>>> feed from a topic with 1 additional overload for the #map() >>>>>>>>>>>>>> methods to >>>>>>>>>>>>>> cover every usecase while keeping the API in a way better >>>>>>>>>>>>>> >>>>>>>>>>>>> state. >>> >>>> - IMO this seems a workaround, rather than a direct solution. >>>>>>>>>>>>> >>>>>>>>>>>>> Perhaps we should continue this discussion in DISCUSS thread. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Cheers, >>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Nov 6, 2017 at 9:14 PM Jan Filipiak >>>>>>>>>>>>> <jan.filip...@trivago.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi. >>>>>>>>>>>>> >>>>>>>>>>>>>> I do understand that it might come in Handy. >>>>>>>>>>>>>> From my POV in any relational algebra this is only a >>>>>>>>>>>>>> projection. >>>>>>>>>>>>>> Currently we hide these "fields" that come with the input >>>>>>>>>>>>>> >>>>>>>>>>>>> record. >>> >>>> It should be 100% sufficient to offer a KTable + KStream that >>>>>>>>>>>>>> >>>>>>>>>>>>> is >>> >>>> directly >>>>>>>>>>>>>> feed from a topic with 1 additional overload for the #map() >>>>>>>>>>>>>> methods to >>>>>>>>>>>>>> cover every usecase while keeping the API in a way better >>>>>>>>>>>>>> >>>>>>>>>>>>> state. >>> >>>> best Jan >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 06.11.2017 17:52, Matthias J. Sax wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Jan, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I understand what you are saying. However, having a >>>>>>>>>>>>>>> RecordContext is >>>>>>>>>>>>>>> super useful for operations that are applied to input topic. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> Many >>> >>>> users >>>>>>>>>>>>>>> requested this feature -- it's much more convenient that >>>>>>>>>>>>>>> >>>>>>>>>>>>>> falling >>> >>>> back >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> to >>>>>>>>>>>>>> transform() to implement a a filter() for example that want to >>>>>>>>>>>>>> access >>>>>>>>>>>>>> >>>>>>>>>>>>>>> some meta data. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Because we cannot distinguish different "origins" of a >>>>>>>>>>>>>>> KStream/KTable, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I >>>>>>>>>>>>>> am not sure if there would be a better way to do this. The >>>>>>>>>>>>>> only >>>>>>>>>>>>>> >>>>>>>>>>>>>>> "workaround" I see, is to have two KStream/KTable interfaces >>>>>>>>>>>>>>> >>>>>>>>>>>>>> each >>> >>>> and >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> we >>>>>>>>>>>>>> would use the first one for KStream/KTable with "proper" >>>>>>>>>>>>>> RecordContext. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> But this does not seem to be a good solution either. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Note, a KTable can also be read directly from a topic, I >>>>>>>>>>>>>>> agree >>>>>>>>>>>>>>> that >>>>>>>>>>>>>>> using RecordContext on a KTable that is the result of an >>>>>>>>>>>>>>> aggregation is >>>>>>>>>>>>>>> questionable. But I don't see a reason to down vote the KIP >>>>>>>>>>>>>>> >>>>>>>>>>>>>> for >>> >>>> this >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> reason. >>>>>>>>>>>>>> >>>>>>>>>>>>>> WDYT about this? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 11/1/17 10:19 PM, Jan Filipiak wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -1 non binding >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I don't get the motivation. >>>>>>>>>>>>>>>> In 80% of my DSL processors there is no such thing as a >>>>>>>>>>>>>>>> reasonable >>>>>>>>>>>>>>>> RecordContext. >>>>>>>>>>>>>>>> After a join the record I am processing belongs to at least >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2 >>> >>>> topics. >>>>>>>>>>>>>>>> After a Group by the record I am processing was created from >>>>>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>> offsets. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The API Design doesn't look appealing >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 01.11.2017 22:02, Jeyhun Karimov wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Dear community, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It seems the discussion for KIP-159 [1] converged finally. >>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>> like to >>>>>>>>>>>>>>>>> initiate voting for the particular KIP. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>> Jeyhun >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>> > -- -- Guozhang