Thanks Jorge, that sounds good to me.

Also please feel free to send out the PR for reviews while the KIP is being
voted on.


Guozhang


On Mon, May 14, 2018 at 8:31 AM, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thanks for your feedback everyone!
>
> If there is no more comments on this KIP, I think we can open the VOTE
> thread.
>
> Cheers,
> Jorge.
>
> El sáb., 12 may. 2018 a las 2:02, Guozhang Wang (<wangg...@gmail.com>)
> escribió:
>
> > Yeah I'm only talking about the DSL part (i.e. how stateful / stateless
> > operators default inheritance protocol would be promised) to be managed
> > with KIP-159.
> >
> > For allowing users to override the default behavior in PAPI, that would
> be
> > in a different KIP.
> >
> >
> > Guozhang
> >
> >
> > On Fri, May 11, 2018 at 10:41 AM, Matthias J. Sax <matth...@confluent.io
> >
> > wrote:
> >
> > > I am actually not sure about this. Because it's about the semantics at
> > > PAPI level, but KIP-159 targets the DSL, it might actually be better to
> > > have a separate KIP?
> > >
> > > -Matthias
> > >
> > > On 5/11/18 9:26 AM, Guozhang Wang wrote:
> > > > That's a good question. I think we can manage this in KIP-159. I will
> > go
> > > > ahead and try to augment that KIP together with the original author
> > > Jeyhun.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Fri, May 11, 2018 at 12:45 AM, Jorge Esteban Quilcate Otoya <
> > > > quilcate.jo...@gmail.com> wrote:
> > > >
> > > >> Thanks Guozhang and Matthias! I do also agree with this way of
> > handling
> > > >> headers inheritance. I will add them to the KIP doc.
> > > >>
> > > >>> We can discuss about extending the current protocol and how to
> enable
> > > >> users
> > > >>> override those rule, and how to expose them in the DSL layer in a
> > > future
> > > >>> KIP.
> > > >>
> > > >> About this, should this be managed on KIP-159 or a new one?
> > > >>
> > > >> El jue., 10 may. 2018 a las 17:46, Matthias J. Sax (<
> > > matth...@confluent.io
> > > >>> )
> > > >> escribió:
> > > >>
> > > >>> Thanks Guozhang! Sounds good to me!
> > > >>>
> > > >>> -Matthias
> > > >>>
> > > >>> On 5/10/18 7:55 AM, Guozhang Wang wrote:
> > > >>>> Thanks for your thoughts Matthias. I think if we do want to bring
> > > >> KIP-244
> > > >>>> into 2.0 then we need to keep its scope small and well defined.
> For
> > > >> that
> > > >>>> I'm proposing:
> > > >>>>
> > > >>>> 1. Make the inheritance implementation of headers consistent with
> > what
> > > >> we
> > > >>>> had with other record context fields. I.e. pass through the record
> > > >>> context
> > > >>>> in `context.forward()`. Note that within a processor node, users
> can
> > > >>>> already manipulate the Headers with the given APIs, so at the time
> > of
> > > >>>> forwarding, the library can just copy what-ever is left / updated
> to
> > > >> the
> > > >>>> next processor node.
> > > >>>>
> > > >>>> 2. In the sink node, where a record is being sent to the Kafka
> > topic,
> > > >> we
> > > >>>> should consider the following:
> > > >>>>
> > > >>>> a. For sink topics, we will set the headers into the producer
> > record.
> > > >>>> b. For repartition topics, we will the headers into the producer
> > > >> record.
> > > >>>> c. For changelog topics, we will drop the headers in the produce
> > > record
> > > >>>> since they will not be used in restoration and not stored in the
> > state
> > > >>>> store either.
> > > >>>>
> > > >>>>
> > > >>>> We can discuss about extending the current protocol and how to
> > enable
> > > >>> users
> > > >>>> override those rule, and how to expose them in the DSL layer in a
> > > >> future
> > > >>>> KIP.
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> Guozhang
> > > >>>>
> > > >>>>
> > > >>>> On Mon, May 7, 2018 at 5:49 PM, Matthias J. Sax <
> > > matth...@confluent.io
> > > >>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Guozhang,
> > > >>>>>
> > > >>>>> if you advocate to forward headers by default, it might be a
> better
> > > >>>>> default strategy do forward the headers for all operators
> (similar
> > to
> > > >>>>> topic/partition/offset metadata). It's usually harder for users
> to
> > > >>>>> reason about different cases and thus I would prefer to have
> > > >> consistent
> > > >>>>> behavior, ie, only one default strategy instead of introducing
> > > >> different
> > > >>>>> cases.
> > > >>>>>
> > > >>>>> Btw: My argument about dropping headers by default only implies,
> > that
> > > >>>>> users need to copy the headers explicitly to the output records
> in
> > > >> there
> > > >>>>> code of they want to inspect them later -- it does not imply that
> > > >>>>> headers cannot be forwarded downstream. (Not sure if this was
> > clear).
> > > >>>>>
> > > >>>>> I am also ok with copying be default thought (for me, it's a
> 51/49
> > > >>>>> preference for dropping by default only).
> > > >>>>>
> > > >>>>>
> > > >>>>> -Matthias
> > > >>>>>
> > > >>>>> On 5/7/18 4:52 PM, Guozhang Wang wrote:
> > > >>>>>> Hi Matthias,
> > > >>>>>>
> > > >>>>>> My concern of setting `null` in all cases is that it would make
> > > >> headers
> > > >>>>> not
> > > >>>>>> very useful in KIP-244 then, because headers will only be
> > available
> > > >> at
> > > >>>>> the
> > > >>>>>> source stream / table, but not in any of the following
> instances.
> > In
> > > >>>>>> practice users may be more likely to look into the headers later
> > in
> > > >> the
> > > >>>>>> pipeline. Personally I'd suggest we pass the headers for all
> > > >> stateless
> > > >>>>>> operators in DSL and everywhere in PAPI's context.forward(). For
> > > >>>>>> repartition topics and sink topics, we also set them in the
> > produced
> > > >>>>>> records accordingly; for changelog topics, we do not set them
> > since
> > > >>> they
> > > >>>>>> are not going to be used anywhere in the store.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Guozhang
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Sun, May 6, 2018 at 9:03 PM, Matthias J. Sax <
> > > >> matth...@confluent.io
> > > >>>>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> I agree, that we should not block this KIP if possible.
> > > >> Nevertheless,
> > > >>> we
> > > >>>>>>> should try to get a reasonable default strategy for inheriting
> > the
> > > >>>>>>> headers so we don't need to change it later on.
> > > >>>>>>>
> > > >>>>>>> Let's see what other think. I still tend slightly to set to
> > `null`
> > > >> by
> > > >>>>>>> default for all cases. If the default strategy is different for
> > > >>>>>>> different operators as you suggest, it might be confusion to
> > users.
> > > >>>>>>> IMHO, the default behavior should be as simple as possible.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> -Matthias
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On 5/6/18 8:53 PM, Guozhang Wang wrote:
> > > >>>>>>>> Matthias, thanks for sharing your opinions in the inheritance
> > > >>> protocol
> > > >>>>> of
> > > >>>>>>>> the record context. I'm thinking maybe we should make this
> > > >> discussion
> > > >>>>> as
> > > >>>>>>> a
> > > >>>>>>>> separate KIP by itself? If yes, then KIP-244's scope would be
> > > >>> smaller,
> > > >>>>>>> and
> > > >>>>>>>> within KIP-244 we can have a simple inheritance rule that
> > setting
> > > >> it
> > > >>> to
> > > >>>>>>>> null when 1) going through stateful operators and 2) sending
> to
> > > any
> > > >>>>>>> topics.
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>> Guozhang
> > > >>>>>>>>
> > > >>>>>>>> On Sun, May 6, 2018 at 10:24 AM, Matthias J. Sax <
> > > >>>>> matth...@confluent.io>
> > > >>>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Making the inheritance protocol a public contract seems
> > > reasonable
> > > >>> to
> > > >>>>>>> me.
> > > >>>>>>>>>
> > > >>>>>>>>> In the current implementation, all output records inherits
> the
> > > >>> offset,
> > > >>>>>>>>> timestamp, topic, and partition metadata from the input
> record.
> > > We
> > > >>>>>>>>> already added an API to change the timestamp explicitly for
> the
> > > >>> output
> > > >>>>>>>>> record thought.
> > > >>>>>>>>>
> > > >>>>>>>>> I think it make sense to keep the inheritance of offset,
> topic,
> > > >> and
> > > >>>>>>>>> partition. For headers, it's worth to discuss. I see
> arguments
> > > for
> > > >>> two
> > > >>>>>>>>> strategies: (1) inherit by default, (2) set `null` by
> default.
> > > >>>>>>>>> Independent of the default behavior, we should add an API to
> > set
> > > >>>>> headers
> > > >>>>>>>>> for output records explicitly though (similar to the "set
> > > >> timestamp
> > > >>>>>>> API").
> > > >>>>>>>>>
> > > >>>>>>>>> From my point of view, timestamp/headers are a different
> > > >>>>>>>>> "class/category" of data/metadata than
> topic/partition/offset.
> > > For
> > > >>> the
> > > >>>>>>>>> first category, it makes sense to manipulate them and it's
> more
> > > >> than
> > > >>>>>>>>> "plain metadata"; especially the timestamp. For the second
> > > >> category
> > > >>> it
> > > >>>>>>>>> does not make sense to manipulate it, and to me
> > > >>> topic/partition/offset
> > > >>>>>>>>> is pure metadata only---strictly speaking, it's even
> > questionable
> > > >> if
> > > >>>>>>>>> output records should have any value for
> topic/partition/offset
> > > in
> > > >>> the
> > > >>>>>>>>> first place, or if they should be `null`, because those
> > > attributes
> > > >>> do
> > > >>>>>>>>> only make sense for source records that are consumed from a
> > topic
> > > >>>>>>>>> directly only. On the other hand, if we make this difference
> > > >>> explicit,
> > > >>>>>>>>> it might be useful information for the use to track the
> current
> > > >>>>>>>>> topic/partition/offset of the original source record.
> > > >>>>>>>>>
> > > >>>>>>>>> Furthermore, to me, timestamps and headers are somewhat
> > > different,
> > > >>>>> too.
> > > >>>>>>>>> For stream processing it's required that every record has a
> > > >>> timestamp;
> > > >>>>>>>>> thus, it make sense to inherit the input record timestamp by
> > > >> default
> > > >>>>> (a
> > > >>>>>>>>> timestamp is not really metadata but actually equally
> important
> > > to
> > > >>> key
> > > >>>>>>>>> and value from my point of view). Header however are
> optional,
> > > and
> > > >>>>> thus
> > > >>>>>>>>> inheriting them is not really required. It might be
> convenient
> > > >>> though:
> > > >>>>>>>>> for example, imagine a simple "filter-only" application -- it
> > > >> would
> > > >>> be
> > > >>>>>>>>> cumbersome for users to explicitly copy the headers from the
> > > input
> > > >>>>>>>>> records to the output records -- it seems to be unnecessary
> > > >>>>> boilerplate
> > > >>>>>>>>> code. On the other hand, for any other more complex use case,
> > > it's
> > > >>>>>>>>> questionable to inherit headers---note, that headers would be
> > > >>> written
> > > >>>>> to
> > > >>>>>>>>> the output topics increasing the size of the messages.
> > Overall, I
> > > >> am
> > > >>>>> not
> > > >>>>>>>>> sure which default strategy might be the better one for
> > headers.
> > > >> Is
> > > >>>>>>>>> there a convincing argument for either one of them? I
> slightly
> > > >> tend
> > > >>> to
> > > >>>>>>>>> think that using `null` as default might be better.
> > > >>>>>>>>>
> > > >>>>>>>>> Last, we could also make the default behavior configurable.
> > > >>> Something
> > > >>>>>>>>> like `inherit.record.headers=true/false` with default value
> > > >> "false".
> > > >>>>>>>>> This would allow people to opt-in for
> auto-header-inheritance.
> > > >> Just
> > > >>> an
> > > >>>>>>>>> idea I wanted to add to the discussion---not sure if it's a
> > good
> > > >>> one.
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> -Matthias
> > > >>>>>>>>>
> > > >>>>>>>>> On 5/4/18 3:13 PM, Guozhang Wang wrote:
> > > >>>>>>>>>> Hello Jorge,
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part
> of
> > > >>>>>>>>> `RecordContext`
> > > >>>>>>>>>> would be handled the same way as other attributes.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Today we do not have a clear inheritance protocol for other
> > > >> fields
> > > >>> of
> > > >>>>>>>>>> RecordContext yet: although internally we do have some
> > criterion
> > > >> on
> > > >>>>>>>>>> topic/partition/offset and timestamp, they are not
> explicitly
> > > >>> exposed
> > > >>>>>>> to
> > > >>>>>>>>>> users.
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> I think we still need to have a defined protocol for headers
> > > >>> itself,
> > > >>>>>>> but
> > > >>>>>>>>> I
> > > >>>>>>>>>> agree that it better to be scoped out side of this KIP,
> since
> > > >> this
> > > >>>>>>>>>> inheritance protocol itself for all the fields of
> > RecordContext
> > > >>> would
> > > >>>>>>>>>> better be a separate KIP. We can document this clearly in
> the
> > > >> wiki
> > > >>>>>>> page.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Guozhang
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> On Fri, May 4, 2018 at 5:26 AM, Florian Garcia <
> > > >>>>>>>>>> garcia.florian.pe...@gmail.com> wrote:
> > > >>>>>>>>>>
> > > >>>>>>>>>>> Hi,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> For me this is a great first step to have Headers in
> > streaming.
> > > >>>>>>>>>>> My current use case is about distributed tracing (Zipkin)
> and
> > > >> with
> > > >>>>> the
> > > >>>>>>>>>>> headers in the processorContext() I'll be able to manage
> that
> > > >> for
> > > >>>>> the
> > > >>>>>>>>> most
> > > >>>>>>>>>>> cases.
> > > >>>>>>>>>>> The KIP-159 should follow after this but this is where all
> > the
> > > >>> major
> > > >>>>>>>>>>> questions will arise for stateful operations (as Guozhang
> > > said).
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thanks for the work on this Jorge.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya <
> > > >>>>>>>>>>> quilcate.jo...@gmail.com> a écrit :
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Thanks Guozhang and John for your feedback.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of
> headers
> > in
> > > >>> our
> > > >>>>>>>>>>>> topology:
> > > >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be
> > > >>>>>>> straight-forward.
> > > >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be
> > > >> straight-forward.
> > > >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates and
> > > >> joins?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Agree. Probably point 3 handles this. `Headers` been part
> of
> > > >>>>>>>>>>>> `RecordContext` would be handled the same way as other
> > > >>> attributes.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers
> to
> > > >>>>>>>>>>>> filter/map/branch",
> > > >>>>>>>>>>>> it may well be covered in KIP-159; worth taking a look at
> > that
> > > >>> KIP.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Yes, I will point to it.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> 2. In terms of internal implementations, should the state
> > > >> store
> > > >>>>>>>>>>>> cache include the headers then in order to be sent
> > > downstreams?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Good question. As `LRUCacheEntry` extends
> `RecordContext`, I
> > > >>> thinks
> > > >>>>>>>>> this
> > > >>>>>>>>>>> is
> > > >>>>>>>>>>>> already supported. I will detail this on the KIP.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers
> headers)",
> > > >> this
> > > >>>>>>> should
> > > >>>>>>>>>>> be
> > > >>>>>>>>>>>> removed?
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Fixed, thanks.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our
> > scope
> > > >> is
> > > >>>>>>> only
> > > >>>>>>>>>>>> for exposing
> > > >>>>>>>>>>>> the headers for reading, and not allowing users to add /
> > > modify
> > > >>>>>>>>> headers,
> > > >>>>>>>>>>>> right? If yes, we'd better state it clearly at the
> "Proposed
> > > >>>>> Changes"
> > > >>>>>>>>>>>> section.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> As headers is exposed in the `ProcessContext`, and headers
> > > will
> > > >>> be
> > > >>>>>>> send
> > > >>>>>>>>>>>> downstream, it can be mutated (add/remove headers).
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>  > Also, despite the decreased scope in this KIP, I think
> it
> > > >>> might
> > > >>>>> be
> > > >>>>>>>>>>>> valuable to define what will happen to headers once this
> > > change
> > > >>> is
> > > >>>>>>>>>>>> implemented. For example, I think a minimal
> groundwork-level
> > > >>> change
> > > >>>>>>>>> might
> > > >>>>>>>>>>>> be to make the API changes, while promising to drop all
> > > headers
> > > >>>>> from
> > > >>>>>>>>>>> input
> > > >>>>>>>>>>>> records.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I will suggest to pass headers to downstream nodes, and
> > don't
> > > >>> drop
> > > >>>>>>>>> yhrm.
> > > >>>>>>>>>>>> Clients will have to drop `Headers` if they have used
> them.
> > > >>>>>>>>>>>> Or it could be something like a boolean config property
> that
> > > >>> manage
> > > >>>>>>>>> this.
> > > >>>>>>>>>>>> I would like to hear feedback here.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> A maximal groundwork change would be to forward the
> headers
> > > >>>>> through
> > > >>>>>>>>> all
> > > >>>>>>>>>>>> operators
> > > >>>>>>>>>>>> in
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Streams. But I think there are some unresolved questions
> > about
> > > >>>>>>>>>>> forwarding,
> > > >>>>>>>>>>>> like "what happens to the headers in a join?"
> > > >>>>>>>>>>>> Probably this would be solve once KIP-159 is implemented
> and
> > > >>>>>>> supporting
> > > >>>>>>>>>>>> Headers.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> There's of course some middle ground, but instinctively,
> I
> > > >> think
> > > >>>>> I'd
> > > >>>>>>>>>>>> prefer to have a clear definition that headers are
> currently
> > > >>> *not*
> > > >>>>>>>>>>>> forwarded, rather than having a complex list of operators
> > that
> > > >> do
> > > >>>>> or
> > > >>>>>>>>>>> don't
> > > >>>>>>>>>>>> forward them. Plus, I think it might be tricky to define
> > this
> > > >>>>>>> behavior
> > > >>>>>>>>>>>> while not allowing the scope to return to that of your
> > > original
> > > >>>>>>>>> proposal!
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Agree. But `Headers` were forwarded *explicitly* in the
> > > >> original
> > > >>>>>>>>>>> proposal.
> > > >>>>>>>>>>>> The current one pass it as part of `RecordContext`, so if
> > it's
> > > >>>>>>> forward
> > > >>>>>>>>> it
> > > >>>>>>>>>>>> or not is as the same as `RecordContext`.
> > > >>>>>>>>>>>> On top of this implementation, we can design how
> > > >> filter/map/join
> > > >>>>> will
> > > >>>>>>>>> be
> > > >>>>>>>>>>>> handled. Probably following KIP-159 approach.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Cheers,
> > > >>>>>>>>>>>> Jorge.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<
> > > >>>>> wangg...@gmail.com
> > > >>>>>>>> )
> > > >>>>>>>>>>>> escribió:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>> Hi Jorge,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks for the written KIP! Made a pass over it and left
> > some
> > > >>>>>>> comments
> > > >>>>>>>>>>>>> (some of them overlapped with John's):
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 1. We need to have a clear inheritance protocol of
> headers
> > in
> > > >>> our
> > > >>>>>>>>>>>> topology:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 1.a. In PAPI's context.forward() call, it should be
> > > >>>>>>> straight-forward.
> > > >>>>>>>>>>>>> 1.b. In DSL stateless operators, it should be
> > > >> straight-forward.
> > > >>>>>>>>>>>>> 1.c. What about in stateful operators like aggregates and
> > > >> joins?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 2. In terms of internal implementations, should the state
> > > >> store
> > > >>>>>>> cache
> > > >>>>>>>>>>>>> include the headers then in order to be sent downstreams?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 3. In future work "Adding DSL Processors to use Headers
> to
> > > >>>>>>>>>>>>> filter/map/branch", it may well be covered in KIP-159;
> > worth
> > > >>>>> taking
> > > >>>>>>> a
> > > >>>>>>>>>>>> look
> > > >>>>>>>>>>>>> at that KIP.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 4. MINOR: "void process(K key, V value, Headers
> headers)",
> > > >> this
> > > >>>>>>> should
> > > >>>>>>>>>>> be
> > > >>>>>>>>>>>>> removed?
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> 5. MINOR: it seems to be the case that in this KIP, our
> > scope
> > > >> is
> > > >>>>>>> only
> > > >>>>>>>>>>> for
> > > >>>>>>>>>>>>> exposing the headers for reading, and not allowing users
> to
> > > >> add
> > > >>> /
> > > >>>>>>>>>>> modify
> > > >>>>>>>>>>>>> headers, right? If yes, we'd better state it clearly at
> the
> > > >>>>>>> "Proposed
> > > >>>>>>>>>>>>> Changes" section.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Guozhang
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Wed, May 2, 2018 at 8:42 AM, John Roesler <
> > > >> j...@confluent.io
> > > >>>>
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi Jorge,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks for the design work.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> I agree that de-scoping the work to just the Processor
> API
> > > >> will
> > > >>>>>>> help
> > > >>>>>>>>>>>>>> contain the design and implementation complexity.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> In the KIP, it mentions that the headers would be
> > available
> > > >> in
> > > >>>>> the
> > > >>>>>>>>>>>>>> ProcessorContext, (like "context.headers()"). It also
> says
> > > >> that
> > > >>>>>>>>>>>>>> implementers would need to implement the method "void
> > > >> process(K
> > > >>>>>>> key,
> > > >>>>>>>>>>> V
> > > >>>>>>>>>>>>>> value, Headers headers);". I think maybe you meant to
> > remove
> > > >>> the
> > > >>>>>>>>>>>> proposal
> > > >>>>>>>>>>>>>> to modify "process", since it wouldn't be necessary in
> > > >>>>> conjunction
> > > >>>>>>>>>>> with
> > > >>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> ProcessorContext change, and it's not represented in
> your
> > > PR.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Also, despite the decreased scope in this KIP, I think
> it
> > > >> might
> > > >>>>> be
> > > >>>>>>>>>>>>> valuable
> > > >>>>>>>>>>>>>> to define what will happen to headers once this change
> is
> > > >>>>>>>>>>> implemented.
> > > >>>>>>>>>>>>> For
> > > >>>>>>>>>>>>>> example, I think a minimal groundwork-level change might
> > be
> > > >> to
> > > >>>>> make
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>> API
> > > >>>>>>>>>>>>>> changes, while promising to drop all headers from input
> > > >>> records.
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> A maximal groundwork change would be to forward the
> > headers
> > > >>>>> through
> > > >>>>>>>>>>> all
> > > >>>>>>>>>>>>>> operators in Streams. But I think there are some
> > unresolved
> > > >>>>>>> questions
> > > >>>>>>>>>>>>> about
> > > >>>>>>>>>>>>>> forwarding, like "what happens to the headers in a
> join?"
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> There's of course some middle ground, but
> instinctively, I
> > > >>> think
> > > >>>>>>> I'd
> > > >>>>>>>>>>>>> prefer
> > > >>>>>>>>>>>>>> to have a clear definition that headers are currently
> > *not*
> > > >>>>>>>>>>> forwarded,
> > > >>>>>>>>>>>>>> rather than having a complex list of operators that do
> or
> > > >> don't
> > > >>>>>>>>>>> forward
> > > >>>>>>>>>>>>>> them. Plus, I think it might be tricky to define this
> > > >> behavior
> > > >>>>>>> while
> > > >>>>>>>>>>>> not
> > > >>>>>>>>>>>>>> allowing the scope to return to that of your original
> > > >> proposal!
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thanks again for the KIP,
> > > >>>>>>>>>>>>>> -John
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate
> > Otoya
> > > >> <
> > > >>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Hi Matthias,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> I've created a new JIRA to track this, updated the KIP
> > and
> > > >>>>> create
> > > >>>>>>> a
> > > >>>>>>>>>>>> PR.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Looking forward to your feedback,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Jorge.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
> > > >>>>>>>>>>>>>> matth...@confluent.io
> > > >>>>>>>>>>>>>>>> )
> > > >>>>>>>>>>>>>>> escribió:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Hi Jorge,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> I would like to unblock this KIP to make some
> progress.
> > > The
> > > >>>>>>>>>>> tricky
> > > >>>>>>>>>>>>>>>> question of this work, seems to be how to expose
> headers
> > > at
> > > >>> DSL
> > > >>>>>>>>>>>>> level.
> > > >>>>>>>>>>>>>>>> This related to KIP-149 and KIP-159. However, for
> > > Processor
> > > >>>>> API,
> > > >>>>>>>>>>> it
> > > >>>>>>>>>>>>>>>> seems to be rather straight forward to add headers to
> > the
> > > >>> API.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Thus, I would suggest to de-scope this KIP and add
> > header
> > > >>>>> support
> > > >>>>>>>>>>>> for
> > > >>>>>>>>>>>>>>>> Processor API only as a first step. If this is done,
> we
> > > can
> > > >>> see
> > > >>>>>>>>>>> in
> > > >>>>>>>>>>>> a
> > > >>>>>>>>>>>>>>>> second step, how to add headers at DSL level.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> WDYT about this proposal?
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> If you agree, please update the JIRA and KIP
> > accordingly.
> > > >>> Note,
> > > >>>>>>>>>>>> that
> > > >>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>>> have two JIRA that are duplicates atm. We can scope
> them
> > > >>>>>>>>>>>> accordingly:
> > > >>>>>>>>>>>>>>>> one for PAPI only, and second as a dependent JIRA for
> > DSL.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya
> wrote:
> > > >>>>>>>>>>>>>>>>> Thanks for your feedback!
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> 1. I was adding headers to KeyValue to support
> groupBy,
> > > >> but
> > > >>> I
> > > >>>>>>>>>>>> think
> > > >>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>> is
> > > >>>>>>>>>>>>>>>>> not necessary. It should be enough with mapping
> headers
> > > to
> > > >>>>>>>>>>>>> key/value
> > > >>>>>>>>>>>>>>> and
> > > >>>>>>>>>>>>>>>>> then group using current KeyValue structure.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> 2. Yes. IMO key/value stores, like RocksDB, rely on
> KV
> > as
> > > >>>>>>>>>>>>> structure,
> > > >>>>>>>>>>>>>>>> hence
> > > >>>>>>>>>>>>>>>>> considering headers as part of stateful operations
> will
> > > >> not
> > > >>>>> fit
> > > >>>>>>>>>>>> in
> > > >>>>>>>>>>>>>> this
> > > >>>>>>>>>>>>>>>>> approach and increase complexity (I cannot think in a
> > > >>> use-case
> > > >>>>>>>>>>>> that
> > > >>>>>>>>>>>>>>> need
> > > >>>>>>>>>>>>>>>>> this).
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> 3. and 4. Changes on 1. will solve this issue.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Probably I rush a bit proposing this change, I was
> not
> > > >> aware
> > > >>>>> of
> > > >>>>>>>>>>>>>> KIP-159
> > > >>>>>>>>>>>>>>>> or
> > > >>>>>>>>>>>>>>>>> KAFKA-5632.
> > > >>>>>>>>>>>>>>>>> If KIP-159 is adopted and we reduce this KIP to add
> > > >> Headers
> > > >>> to
> > > >>>>>>>>>>>>>>>>> RecordContext will be enough, but I'm not sure about
> > the
> > > >>> scope
> > > >>>>>>>>>>> of
> > > >>>>>>>>>>>>>>>> KIP-159.
> > > >>>>>>>>>>>>>>>>> If it includes stateful operations will be difficult
> to
> > > >>>>>>>>>>>> implemented
> > > >>>>>>>>>>>>>> as
> > > >>>>>>>>>>>>>>>>> stated in 2.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> Cheers,
> > > >>>>>>>>>>>>>>>>> Jorge.
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>> El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (<
> > > >>>>>>>>>>>>>>>> matth...@confluent.io>)
> > > >>>>>>>>>>>>>>>>> escribió:
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> Thanks for the KIP Jorge,
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> As Bill pointed out already, we should be careful
> with
> > > >>> adding
> > > >>>>>>>>>>>> new
> > > >>>>>>>>>>>>>>>>>> overloads as this contradicts the work done via
> > KIP-182.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> This KIP also seems to be related to KIP-149 and
> > > KIP-159.
> > > >>> Are
> > > >>>>>>>>>>>> you
> > > >>>>>>>>>>>>>>> aware
> > > >>>>>>>>>>>>>>>>>> of them? Both have quite long DISCUSS threads, but
> it
> > > >> might
> > > >>>>> be
> > > >>>>>>>>>>>>> worth
> > > >>>>>>>>>>>>>>>>>> browsing through them.
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> A few further questions:
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>  - why do you want to add the headers to
> `KeyValue`? I
> > > am
> > > >>> not
> > > >>>>>>>>>>>> sure
> > > >>>>>>>>>>>>>> if
> > > >>>>>>>>>>>>>>> we
> > > >>>>>>>>>>>>>>>>>> should consider headers as optional metadata and add
> > it
> > > >> to
> > > >>>>>>>>>>>>>>>>>> `RecordContext` similar to timestamp, offset, etc.
> > only
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>  - You only include stateless single-record
> > > >> transformations
> > > >>>>> at
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>> DSL
> > > >>>>>>>>>>>>>>>>>> level. Do you suggest that all other operator just
> > drop
> > > >>>>>>>>>>> headers
> > > >>>>>>>>>>>> on
> > > >>>>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>>>> floor?
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>  - Why do you only want to put headers into
> in-memory
> > > and
> > > >>>>>>>>>>> cache
> > > >>>>>>>>>>>>> but
> > > >>>>>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>> RocksDB store? What do you mean by "pass through"?
> > IMHO,
> > > >>> all
> > > >>>>>>>>>>>>> stores
> > > >>>>>>>>>>>>>>>>>> should behave the same at DSL level.
> > > >>>>>>>>>>>>>>>>>>    -> if we store the headers in the state stores,
> > what
> > > >> is
> > > >>>>> the
> > > >>>>>>>>>>>>>> upgrade
> > > >>>>>>>>>>>>>>>>>> path?
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>  - Why do we need to store record header in state in
> > the
> > > >>>>> first
> > > >>>>>>>>>>>>>> place,
> > > >>>>>>>>>>>>>>> if
> > > >>>>>>>>>>>>>>>>>> we exclude stateful operator at DSL level?
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> What is the motivation for the "border lines" you
> > > choose?
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> -Matthias
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>> On 12/21/17 8:18 AM, Bill Bejeck wrote:
> > > >>>>>>>>>>>>>>>>>>> Jorge,
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Thanks for the KIP, I know this is a feature others
> > in
> > > >> the
> > > >>>>>>>>>>>>>> community
> > > >>>>>>>>>>>>>>>> have
> > > >>>>>>>>>>>>>>>>>>> been interested in getting into Kafka Streams.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I took a quick pass over it, and I have one initial
> > > >>>>> question.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> We recently reduced overloads with KIP-182, and in
> > this
> > > >>> KIP
> > > >>>>>>>>>>> we
> > > >>>>>>>>>>>>> are
> > > >>>>>>>>>>>>>>>>>>> increasing them again.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> I can see from the KIP why they are necessary, but
> > I'm
> > > >>>>>>>>>>>> wondering
> > > >>>>>>>>>>>>> if
> > > >>>>>>>>>>>>>>>> there
> > > >>>>>>>>>>>>>>>>>>> is something else we can do to cut down on the
> > > overloads
> > > >>>>>>>>>>>>>>> introduced.  I
> > > >>>>>>>>>>>>>>>>>>> don't have any sound suggestions ATM, so I'll have
> to
> > > >>> think
> > > >>>>>>>>>>>> about
> > > >>>>>>>>>>>>>> it
> > > >>>>>>>>>>>>>>>> some
> > > >>>>>>>>>>>>>>>>>>> more, but I wanted to put the thought out there.
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>>>>>>>> Bill
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban
> > Quilcate
> > > >>>>>>>>>>> Otoya <
> > > >>>>>>>>>>>>>>>>>>> quilcate.jo...@gmail.com> wrote:
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> I have created a KIP to add Record Headers support
> > to
> > > >>> Kafka
> > > >>>>>>>>>>>>>> Streams
> > > >>>>>>>>>>>>>>>> API:
> > > >>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/
> > > confluence/display/KAFKA/KIP-
> > > >>>>>>>>>>>>>>>>>>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> The main goal is to be able to use headers to
> > filter,
> > > >> map
> > > >>>>>>>>>>> and
> > > >>>>>>>>>>>>>>> process
> > > >>>>>>>>>>>>>>>>>>>> records as streams. Stateful processing (joins,
> > > >> windows)
> > > >>>>> are
> > > >>>>>>>>>>>> not
> > > >>>>>>>>>>>>>>>>>>>> considered.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Proposed changes/Draft:
> > > >>>>>>>>>>>>>>>>>>>>
> > https://github.com/apache/kafka/compare/trunk...jeqo:
> > > >>>>>>>>>>>>>>> streams-headers
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Feedback and suggestions are more than welcome.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Cheers,
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>> Jorge.
> > > >>>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> --
> > > >>>>>>>>>>>>> -- Guozhang
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>
> > > >
> > > >
> > > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to