@Jeyhun,

I understand that the discussion about KIP-159 is dragging long, so while
we are moving on discussion for whether / hows of KIP-159, maybe you can
start implementing the non overlapping part of the APIs of KIP-149 to get
you unblocked?

Guozhang

On Sun, Nov 19, 2017 at 12:12 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Jan: which approach are you referring to as "the approach that is on the
> table would be perfect"?
>
> Note that in today's PAPI layer we are already effectively exposing the
> record context which has the issues that we have been discussing right now,
> and its semantics is always referring to the "processing record" at hand.
> More specifically, we can think of processing a record a bit different:
>
> 1) the record traversed the topology from source to sink, it may be
> transformed into new object or even generate multiple new objects (think:
> branch) along the traversal. And the record context is referring to this
> processing record. Here the "lifetime" of the record lasts for the entire
> topology traversal and any new records of this traversal is treated as
> different transformed values of this record (this applies to join and
> aggregations as well).
>
> 2) the record being processed is wiped out in the first operator after the
> source, and NEW records are forwarded to downstream operators. I.e. each
> record only lives between two adjacent operators, once it reached the new
> operator it's lifetime has ended and new records are generated.
>
> I think in the past we have talked about Streams under both context, and
> we do not have a clear agreement. I agree that 2) is logically more
> understandable for users as it does not leak any internal implementation
> details (e.g. for stream-table joins, table record's traversal ends at the
> join operator as it is only be materialized, while stream record's
> traversal goes through the join operator to further down until sinks).
> However if we are going to interpret following 2) above then even for
> non-stateful operators we would not inherit record context. What we're
> discussing now, seems to infer a third semantics:
>
> 3) a record would traverse "through" one-to-one (non-stateful) operators,
> will "replicate" at one-to-many (non-stateful) operators (think:
> "mapValues" ) and will "end" at many-to-one (stateful) operators where
> NEW records will be generated and forwarded to the downstream operators.
>
> Just wanted to lay the ground for discussions so we are all on the same
> page before chatting more.
>
>
> Guozhang
>
>
> On Sat, Nov 18, 2017 at 3:10 AM, Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
>> Hi,
>>
>>  not an issue at all. IMO
>> the approach that is on the table would be perfect
>>
>>
>> On 18.11.2017 10:58, Jeyhun Karimov wrote:
>>
>>> Hi,
>>>
>>> I did not expected that Context will be this much an issue. Instead of
>>> applying different semantics for different operators, I think we should
>>> remove this feature completely.
>>>
>>>
>>> Cheers,
>>> Jeyhun
>>> On Sat 18. Nov 2017 at 07:49, Jan Filipiak <jan.filip...@trivago.com>
>>> wrote:
>>>
>>> Yes, the mail said only join so I wanted to clarify.
>>>>
>>>>
>>>>
>>>> On 17.11.2017 19:05, Matthias J. Sax wrote:
>>>>
>>>>> Yes. But I think an aggregation is an many-to-one operation, too.
>>>>>
>>>>> For the stripping off part: internally, we can just keep some record
>>>>> context, but just do not allow users to access it (because the context
>>>>> context does not make sense for them) by hiding the corresponding APIs.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 11/16/17 10:05 PM, Guozhang Wang wrote:
>>>>>
>>>>>> Matthias,
>>>>>>
>>>>>> For this idea, are your proposing that for any many-to-one mapping
>>>>>> operations (for now only Join operators), we will strip off the record
>>>>>> context in the resulted records and claim "we cannot infer its traced
>>>>>> context anymore"?
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 16, 2017 at 1:03 PM, Matthias J. Sax <
>>>>>> matth...@confluent.io
>>>>>> wrote:
>>>>>>
>>>>>> Any thoughts about my latest proposal?
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 11/10/17 10:02 PM, Jan Filipiak wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> i think this is the better way. Naming is always tricky Source is
>>>>>>>>
>>>>>>> kinda
>>>>
>>>>> taken
>>>>>>>> I had TopicBackedK[Source|Table] in mind
>>>>>>>> but for the user its way better already IMHO
>>>>>>>>
>>>>>>>> Thank you for reconsideration
>>>>>>>>
>>>>>>>> Best Jan
>>>>>>>>
>>>>>>>>
>>>>>>>> On 10.11.2017 22:48, Matthias J. Sax wrote:
>>>>>>>>
>>>>>>>>> I was thinking about the source stream/table idea once more and it
>>>>>>>>>
>>>>>>>> seems
>>>>
>>>>> it would not be too hard to implement:
>>>>>>>>>
>>>>>>>>> We add two new classes
>>>>>>>>>
>>>>>>>>>      SourceKStream extends KStream
>>>>>>>>>
>>>>>>>>> and
>>>>>>>>>
>>>>>>>>>      SourceKTable extend KTable
>>>>>>>>>
>>>>>>>>> and return both from StreamsBuilder#stream and StreamsBuilder#table
>>>>>>>>>
>>>>>>>>> As both are sub-classes, this change is backward compatible. We
>>>>>>>>>
>>>>>>>> change
>>>>
>>>>> the return type for any single-record transform to this new types,
>>>>>>>>>
>>>>>>>> too,
>>>>
>>>>> and use KStream/KTable as return type for any multi-record operation.
>>>>>>>>>
>>>>>>>>> The new RecordContext API is added to both new classes. For old
>>>>>>>>>
>>>>>>>> classes,
>>>>
>>>>> we only implement KIP-149 to get access to the key.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> WDYT?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>> On 11/9/17 9:13 PM, Jan Filipiak wrote:
>>>>>>>>>
>>>>>>>>>> Okay,
>>>>>>>>>>
>>>>>>>>>> looks like it would _at least work_ for Cached KTableSources .
>>>>>>>>>> But we make it harder to the user to make mistakes by putting
>>>>>>>>>> features into places where they don't make sense and don't
>>>>>>>>>> help anyone.
>>>>>>>>>>
>>>>>>>>>> I once again think that my suggestion is easier to implement and
>>>>>>>>>> more correct. I will use this email to express my disagreement
>>>>>>>>>> with
>>>>>>>>>>
>>>>>>>>> the
>>>>
>>>>> proposed KIP (-1 non binding of course) state that I am open for any
>>>>>>>>>> questions
>>>>>>>>>> regarding this. I will also do the usual thing and point out that
>>>>>>>>>>
>>>>>>>>> the
>>>>
>>>>> friends
>>>>>>>>>> over at Hive got it correct aswell.
>>>>>>>>>> One can not user their
>>>>>>>>>> https://cwiki.apache.org/confluence/display/Hive/
>>>>>>>>>>
>>>>>>>>> LanguageManual+VirtualColumns
>>>>>>>
>>>>>>>> in any place where its not read from the Sources.
>>>>>>>>>>
>>>>>>>>>> With KSQl in mind it makes me sad how this is evolving here.
>>>>>>>>>>
>>>>>>>>>> Best Jan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 10.11.2017 01:06, Guozhang Wang wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello Jan,
>>>>>>>>>>>
>>>>>>>>>>> Regarding your question about caching: today we keep the record
>>>>>>>>>>>
>>>>>>>>>> context
>>>>>>>
>>>>>>>> with the cached entry already so when we flush the cache which may
>>>>>>>>>>> generate
>>>>>>>>>>> new records forwarding we will set the record context
>>>>>>>>>>>
>>>>>>>>>> appropriately;
>>>>
>>>>> and
>>>>>>>>>>> then after the flush is completed we will reset the context to
>>>>>>>>>>> the
>>>>>>>>>>> record
>>>>>>>>>>> before the flush happens. But I think when Jeyhun did the PR it
>>>>>>>>>>> is
>>>>>>>>>>>
>>>>>>>>>> a
>>>>
>>>>> good
>>>>>>>>>>> time to double check on such stages to make sure we are not
>>>>>>>>>>> introducing any
>>>>>>>>>>> regressions.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Nov 6, 2017 at 8:54 PM, Jan Filipiak <
>>>>>>>>>>>
>>>>>>>>>> jan.filip...@trivago.com>
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> I Aggree completely.
>>>>>>>>>>>>
>>>>>>>>>>>> Exposing this information in a place where it has no _natural_
>>>>>>>>>>>> belonging
>>>>>>>>>>>> might really be a bad blocker in the long run.
>>>>>>>>>>>>
>>>>>>>>>>>> Concerning your first point. I would argue its not to hard to
>>>>>>>>>>>>
>>>>>>>>>>> have a
>>>>
>>>>> user
>>>>>>>>>>>> keep track of these. If we still don't want the user
>>>>>>>>>>>> to keep track of these I would argue that all > projection only
>>>>>>>>>>>> <
>>>>>>>>>>>> transformations on a Source-backed KTable/KStream
>>>>>>>>>>>> could also return a Ktable/KStream instance of the type we
>>>>>>>>>>>> return
>>>>>>>>>>>> from the
>>>>>>>>>>>> topology builder.
>>>>>>>>>>>> Only after any operation that exceeds projection or filter one
>>>>>>>>>>>>
>>>>>>>>>>> would
>>>>
>>>>> return a KTable not granting access to this any longer.
>>>>>>>>>>>>
>>>>>>>>>>>> Even then its difficult already: I never ran a topology with
>>>>>>>>>>>>
>>>>>>>>>>> caching
>>>>
>>>>> but I
>>>>>>>>>>>> am not even 100% sure what the record Context means behind
>>>>>>>>>>>> a materialized KTable with Caching? Topic and Partition are
>>>>>>>>>>>>
>>>>>>>>>>> probably
>>>>
>>>>> with
>>>>>>>>>>>> some reasoning but offset is probably only the offset causing
>>>>>>>>>>>> the
>>>>>>>>>>>> flush?
>>>>>>>>>>>> So one might aswell think to drop offsets from this
>>>>>>>>>>>> RecordContext.
>>>>>>>>>>>>
>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 07.11.2017 03:18, Guozhang Wang wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Regarding the API design (the proposed set of overloads v.s. one
>>>>>>>>>>>>> overload
>>>>>>>>>>>>> on #map to enrich the record), I think what we have represents
>>>>>>>>>>>>> a
>>>>>>>>>>>>>
>>>>>>>>>>>> good
>>>>>>>
>>>>>>>> trade-off between API succinctness and user convenience: on one
>>>>>>>>>>>>> hand we
>>>>>>>>>>>>> definitely want to keep as fewer overloaded functions as
>>>>>>>>>>>>>
>>>>>>>>>>>> possible.
>>>>
>>>>> But on
>>>>>>>>>>>>> the other hand if we only do that in, say, the #map() function
>>>>>>>>>>>>>
>>>>>>>>>>>> then
>>>>
>>>>> this
>>>>>>>>>>>>> enrichment could be an overkill: think of a topology that has 7
>>>>>>>>>>>>> operators
>>>>>>>>>>>>> in a chain, where users want to access the record context on
>>>>>>>>>>>>> operator #2
>>>>>>>>>>>>> and #6 only, with the "enrichment" manner they need to do the
>>>>>>>>>>>>> enrichment
>>>>>>>>>>>>> on
>>>>>>>>>>>>> operator #2 and keep it that way until #6. In addition, the
>>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>> fields (topic, offset, etc) are really orthogonal to the
>>>>>>>>>>>>>
>>>>>>>>>>>> key-value
>>>>
>>>>> payloads
>>>>>>>>>>>>> themselves, so I think separating them into this object is a
>>>>>>>>>>>>>
>>>>>>>>>>>> cleaner
>>>>
>>>>> way.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regarding the RecordContext inheritance, this is actually a
>>>>>>>>>>>>> good
>>>>>>>>>>>>> point
>>>>>>>>>>>>> that
>>>>>>>>>>>>> have not been discussed thoroughly before. Here are my my two
>>>>>>>>>>>>>
>>>>>>>>>>>> cents:
>>>>
>>>>> one
>>>>>>>>>>>>> natural way would be to inherit the record context from the
>>>>>>>>>>>>> "triggering"
>>>>>>>>>>>>> record, for example in a join operator, if the record from
>>>>>>>>>>>>>
>>>>>>>>>>>> stream A
>>>>
>>>>> triggers the join then the record context is inherited from with
>>>>>>>>>>>>>
>>>>>>>>>>>> that
>>>>>>>
>>>>>>>> record. This is also aligned with the lower-level PAPI
>>>>>>>>>>>>>
>>>>>>>>>>>> interface. A
>>>>
>>>>> counter
>>>>>>>>>>>>> argument, though, would be that this is sort of leaking the
>>>>>>>>>>>>>
>>>>>>>>>>>> internal
>>>>
>>>>> implementations of the DSL, so that moving forward if we did some
>>>>>>>>>>>>> refactoring to our join implementations so that the triggering
>>>>>>>>>>>>> record can
>>>>>>>>>>>>> change, the RecordContext would also be different. I do not
>>>>>>>>>>>>> know
>>>>>>>>>>>>>
>>>>>>>>>>>> how
>>>>
>>>>> much
>>>>>>>>>>>>> it would really affect end users, but would like to hear your
>>>>>>>>>>>>> opinions.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Agreed to 100% exposing this information
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Nov 6, 2017 at 1:00 PM, Jeyhun Karimov <
>>>>>>>>>>>>>
>>>>>>>>>>>> je.kari...@gmail.com>
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Jan,
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Sorry for late reply.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The API Design doesn't look appealing
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In terms of API design we tried to preserve the java
>>>>>>>>>>>>>> functional
>>>>>>>>>>>>>> interfaces.
>>>>>>>>>>>>>> We applied the same set of rich methods for KTable to make it
>>>>>>>>>>>>>> compatible
>>>>>>>>>>>>>> with the rest of overloaded APIs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It should be 100% sufficient to offer a KTable + KStream that
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>> directly
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> feed from a topic with 1 additional overload for the #map()
>>>>>>>>>>>>>>> methods to
>>>>>>>>>>>>>>> cover every usecase while keeping the API in a way better
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> state.
>>>>
>>>>> - IMO this seems a workaround, rather than a direct solution.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Perhaps we should continue this discussion in DISCUSS thread.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Nov 6, 2017 at 9:14 PM Jan Filipiak
>>>>>>>>>>>>>> <jan.filip...@trivago.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I do understand that it might come in Handy.
>>>>>>>>>>>>>>>       From my POV in any relational algebra this is only a
>>>>>>>>>>>>>>> projection.
>>>>>>>>>>>>>>> Currently we hide these "fields" that come with the input
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> record.
>>>>
>>>>> It should be 100% sufficient to offer a KTable + KStream that
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> is
>>>>
>>>>> directly
>>>>>>>>>>>>>>> feed from a topic with 1 additional overload for the #map()
>>>>>>>>>>>>>>> methods to
>>>>>>>>>>>>>>> cover every usecase while keeping the API in a way better
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> state.
>>>>
>>>>> best Jan
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 06.11.2017 17:52, Matthias J. Sax wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jan,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I understand what you are saying. However, having a
>>>>>>>>>>>>>>>> RecordContext is
>>>>>>>>>>>>>>>> super useful for operations that are applied to input topic.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Many
>>>>
>>>>> users
>>>>>>>>>>>>>>>> requested this feature -- it's much more convenient that
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> falling
>>>>
>>>>> back
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> transform() to implement a a filter() for example that want
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> access
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> some meta data.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Because we cannot distinguish different "origins" of a
>>>>>>>>>>>>>>>> KStream/KTable,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>> am not sure if there would be a better way to do this. The
>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> "workaround" I see, is to have two KStream/KTable interfaces
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> each
>>>>
>>>>> and
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> would use the first one for KStream/KTable with "proper"
>>>>>>>>>>>>>>> RecordContext.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> But this does not seem to be a good solution either.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Note, a KTable can also be read directly from a topic, I
>>>>>>>>>>>>>>>> agree
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> using RecordContext on a KTable that is the result of an
>>>>>>>>>>>>>>>> aggregation is
>>>>>>>>>>>>>>>> questionable. But I don't see a reason to down vote the KIP
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> for
>>>>
>>>>> this
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> reason.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> WDYT about this?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 11/1/17 10:19 PM, Jan Filipiak wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -1 non binding
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I don't get the motivation.
>>>>>>>>>>>>>>>>> In 80% of my DSL processors there is no such thing as a
>>>>>>>>>>>>>>>>> reasonable
>>>>>>>>>>>>>>>>> RecordContext.
>>>>>>>>>>>>>>>>> After a join  the record I am processing belongs to at
>>>>>>>>>>>>>>>>> least
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2
>>>>
>>>>> topics.
>>>>>>>>>>>>>>>>> After a Group by the record I am processing was created
>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>> offsets.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The API Design doesn't look appealing
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 01.11.2017 22:02, Jeyhun Karimov wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Dear community,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> It seems the discussion for KIP-159 [1] converged
>>>>>>>>>>>>>>>>>> finally. I
>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>> like to
>>>>>>>>>>>>>>>>>> initiate voting for the particular KIP.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 159%3A+Introducing+Rich+functions+to+Streams
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>> Jeyhun
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>
>>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to