Hi,

For me this is a great first step to have Headers in streaming.
My current use case is about distributed tracing (Zipkin) and with the
headers in the processorContext() I'll be able to manage that for the most
cases.
The KIP-159 should follow after this but this is where all the major
questions will arise for stateful operations (as Guozhang said).

Thanks for the work on this Jorge.

Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> a écrit :

> Thanks Guozhang and John for your feedback.
>
> > 1. We need to have a clear inheritance protocol of headers in our
> topology:
> > 1.a. In PAPI's context.forward() call, it should be straight-forward.
> > 1.b. In DSL stateless operators, it should be straight-forward.
> > 1.c. What about in stateful operators like aggregates and joins?
>
> Agree. Probably point 3 handles this. `Headers` been part of
> `RecordContext` would be handled the same way as other attributes.
>
> > 3. In future work "Adding DSL Processors to use Headers to
> filter/map/branch",
> it may well be covered in KIP-159; worth taking a look at that KIP.
>
> Yes, I will point to it.
>
> > 2. In terms of internal implementations, should the state store
> cache include the headers then in order to be sent downstreams?
>
> Good question. As `LRUCacheEntry` extends `RecordContext`, I thinks this is
> already supported. I will detail this on the KIP.
>
> > 4. MINOR: "void process(K key, V value, Headers headers)", this should be
> removed?
>
> Fixed, thanks.
>
> > 5. MINOR: it seems to be the case that in this KIP, our scope is only
> for exposing
> the headers for reading, and not allowing users to add / modify headers,
> right? If yes, we'd better state it clearly at the "Proposed Changes"
> section.
>
> As headers is exposed in the `ProcessContext`, and headers will be send
> downstream, it can be mutated (add/remove headers).
>
>  > Also, despite the decreased scope in this KIP, I think it might be
> valuable to define what will happen to headers once this change is
> implemented. For example, I think a minimal groundwork-level change might
> be to make the API changes, while promising to drop all headers from input
> records.
>
> I will suggest to pass headers to downstream nodes, and don't drop yhrm.
> Clients will have to drop `Headers` if they have used them.
> Or it could be something like a boolean config property that manage this.
> I would like to hear feedback here.
>
> > A maximal groundwork change would be to forward the headers through all
> operators
> in
>
> Streams. But I think there are some unresolved questions about forwarding,
> like "what happens to the headers in a join?"
> Probably this would be solve once KIP-159 is implemented and supporting
> Headers.
>
> > There's of course some middle ground, but instinctively, I think I'd
> prefer to have a clear definition that headers are currently *not*
> forwarded, rather than having a complex list of operators that do or don't
> forward them. Plus, I think it might be tricky to define this behavior
> while not allowing the scope to return to that of your original proposal!
>
> Agree. But `Headers` were forwarded *explicitly* in the original proposal.
> The current one pass it as part of `RecordContext`, so if it's forward it
> or not is as the same as `RecordContext`.
> On top of this implementation, we can design how filter/map/join will be
> handled. Probably following KIP-159 approach.
>
> Cheers,
> Jorge.
>
> El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<wangg...@gmail.com>)
> escribió:
>
> > Hi Jorge,
> >
> > Thanks for the written KIP! Made a pass over it and left some comments
> > (some of them overlapped with John's):
> >
> > 1. We need to have a clear inheritance protocol of headers in our
> topology:
> >
> > 1.a. In PAPI's context.forward() call, it should be straight-forward.
> > 1.b. In DSL stateless operators, it should be straight-forward.
> > 1.c. What about in stateful operators like aggregates and joins?
> >
> > 2. In terms of internal implementations, should the state store cache
> > include the headers then in order to be sent downstreams?
> >
> > 3. In future work "Adding DSL Processors to use Headers to
> > filter/map/branch", it may well be covered in KIP-159; worth taking a
> look
> > at that KIP.
> >
> > 4. MINOR: "void process(K key, V value, Headers headers)", this should be
> > removed?
> >
> > 5. MINOR: it seems to be the case that in this KIP, our scope is only for
> > exposing the headers for reading, and not allowing users to add / modify
> > headers, right? If yes, we'd better state it clearly at the "Proposed
> > Changes" section.
> >
> >
> > Guozhang
> >
> >
> > On Wed, May 2, 2018 at 8:42 AM, John Roesler <j...@confluent.io> wrote:
> >
> > > Hi Jorge,
> > >
> > > Thanks for the design work.
> > >
> > > I agree that de-scoping the work to just the Processor API will help
> > > contain the design and implementation complexity.
> > >
> > > In the KIP, it mentions that the headers would be available in the
> > > ProcessorContext, (like "context.headers()"). It also says that
> > > implementers would need to implement the method "void process(K key, V
> > > value, Headers headers);". I think maybe you meant to remove the
> proposal
> > > to modify "process", since it wouldn't be necessary in conjunction with
> > the
> > > ProcessorContext change, and it's not represented in your PR.
> > >
> > > Also, despite the decreased scope in this KIP, I think it might be
> > valuable
> > > to define what will happen to headers once this change is implemented.
> > For
> > > example, I think a minimal groundwork-level change might be to make the
> > API
> > > changes, while promising to drop all headers from input records.
> > >
> > > A maximal groundwork change would be to forward the headers through all
> > > operators in Streams. But I think there are some unresolved questions
> > about
> > > forwarding, like "what happens to the headers in a join?"
> > >
> > > There's of course some middle ground, but instinctively, I think I'd
> > prefer
> > > to have a clear definition that headers are currently *not* forwarded,
> > > rather than having a complex list of operators that do or don't forward
> > > them. Plus, I think it might be tricky to define this behavior while
> not
> > > allowing the scope to return to that of your original proposal!
> > >
> > > Thanks again for the KIP,
> > > -John
> > >
> > > On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya <
> > > quilcate.jo...@gmail.com> wrote:
> > >
> > > > Hi Matthias,
> > > >
> > > > I've created a new JIRA to track this, updated the KIP and create a
> PR.
> > > >
> > > > Looking forward to your feedback,
> > > >
> > > > Jorge.
> > > >
> > > > El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
> > > matth...@confluent.io
> > > > >)
> > > > escribió:
> > > >
> > > > > Hi Jorge,
> > > > >
> > > > > I would like to unblock this KIP to make some progress. The tricky
> > > > > question of this work, seems to be how to expose headers at DSL
> > level.
> > > > > This related to KIP-149 and KIP-159. However, for Processor API, it
> > > > > seems to be rather straight forward to add headers to the API.
> > > > >
> > > > > Thus, I would suggest to de-scope this KIP and add header support
> for
> > > > > Processor API only as a first step. If this is done, we can see in
> a
> > > > > second step, how to add headers at DSL level.
> > > > >
> > > > > WDYT about this proposal?
> > > > >
> > > > > If you agree, please update the JIRA and KIP accordingly. Note,
> that
> > we
> > > > > have two JIRA that are duplicates atm. We can scope them
> accordingly:
> > > > > one for PAPI only, and second as a dependent JIRA for DSL.
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote:
> > > > > > Thanks for your feedback!
> > > > > >
> > > > > > 1. I was adding headers to KeyValue to support groupBy, but I
> think
> > > it
> > > > is
> > > > > > not necessary. It should be enough with mapping headers to
> > key/value
> > > > and
> > > > > > then group using current KeyValue structure.
> > > > > >
> > > > > > 2. Yes. IMO key/value stores, like RocksDB, rely on KV as
> > structure,
> > > > > hence
> > > > > > considering headers as part of stateful operations will not fit
> in
> > > this
> > > > > > approach and increase complexity (I cannot think in a use-case
> that
> > > > need
> > > > > > this).
> > > > > >
> > > > > > 3. and 4. Changes on 1. will solve this issue.
> > > > > >
> > > > > > Probably I rush a bit proposing this change, I was not aware of
> > > KIP-159
> > > > > or
> > > > > > KAFKA-5632.
> > > > > > If KIP-159 is adopted and we reduce this KIP to add Headers to
> > > > > > RecordContext will be enough, but I'm not sure about the scope of
> > > > > KIP-159.
> > > > > > If it includes stateful operations will be difficult to
> implemented
> > > as
> > > > > > stated in 2.
> > > > > >
> > > > > > Cheers,
> > > > > > Jorge.
> > > > > >
> > > > > > El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (<
> > > > > matth...@confluent.io>)
> > > > > > escribió:
> > > > > >
> > > > > >> Thanks for the KIP Jorge,
> > > > > >>
> > > > > >> As Bill pointed out already, we should be careful with adding
> new
> > > > > >> overloads as this contradicts the work done via KIP-182.
> > > > > >>
> > > > > >> This KIP also seems to be related to KIP-149 and KIP-159. Are
> you
> > > > aware
> > > > > >> of them? Both have quite long DISCUSS threads, but it might be
> > worth
> > > > > >> browsing through them.
> > > > > >>
> > > > > >> A few further questions:
> > > > > >>
> > > > > >>  - why do you want to add the headers to `KeyValue`? I am not
> sure
> > > if
> > > > we
> > > > > >> should consider headers as optional metadata and add it to
> > > > > >> `RecordContext` similar to timestamp, offset, etc. only
> > > > > >
> > > > > >
> > > > > >>  - You only include stateless single-record transformations at
> the
> > > DSL
> > > > > >> level. Do you suggest that all other operator just drop headers
> on
> > > the
> > > > > >> floor?
> > > > > >>
> > > > > >>  - Why do you only want to put headers into in-memory and cache
> > but
> > > > not
> > > > > >> RocksDB store? What do you mean by "pass through"? IMHO, all
> > stores
> > > > > >> should behave the same at DSL level.
> > > > > >>    -> if we store the headers in the state stores, what is the
> > > upgrade
> > > > > >> path?
> > > > > >>
> > > > > >>  - Why do we need to store record header in state in the first
> > > place,
> > > > if
> > > > > >> we exclude stateful operator at DSL level?
> > > > > >>
> > > > > >>
> > > > > >> What is the motivation for the "border lines" you choose?
> > > > > >>
> > > > > >>
> > > > > >> -Matthias
> > > > > >>
> > > > > >>
> > > > > >> On 12/21/17 8:18 AM, Bill Bejeck wrote:
> > > > > >>> Jorge,
> > > > > >>>
> > > > > >>> Thanks for the KIP, I know this is a feature others in the
> > > community
> > > > > have
> > > > > >>> been interested in getting into Kafka Streams.
> > > > > >>>
> > > > > >>> I took a quick pass over it, and I have one initial question.
> > > > > >>>
> > > > > >>> We recently reduced overloads with KIP-182, and in this KIP we
> > are
> > > > > >>> increasing them again.
> > > > > >>>
> > > > > >>> I can see from the KIP why they are necessary, but I'm
> wondering
> > if
> > > > > there
> > > > > >>> is something else we can do to cut down on the overloads
> > > > introduced.  I
> > > > > >>> don't have any sound suggestions ATM, so I'll have to think
> about
> > > it
> > > > > some
> > > > > >>> more, but I wanted to put the thought out there.
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>> Bill
> > > > > >>>
> > > > > >>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate Otoya <
> > > > > >>> quilcate.jo...@gmail.com> wrote:
> > > > > >>>
> > > > > >>>> Hi all,
> > > > > >>>>
> > > > > >>>> I have created a KIP to add Record Headers support to Kafka
> > > Streams
> > > > > API:
> > > > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > >>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> The main goal is to be able to use headers to filter, map and
> > > > process
> > > > > >>>> records as streams. Stateful processing (joins, windows) are
> not
> > > > > >>>> considered.
> > > > > >>>>
> > > > > >>>> Proposed changes/Draft:
> > > > > >>>> https://github.com/apache/kafka/compare/trunk...jeqo:
> > > > streams-headers
> > > > > >>>>
> > > > > >>>> Feedback and suggestions are more than welcome.
> > > > > >>>>
> > > > > >>>> Cheers,
> > > > > >>>>
> > > > > >>>> Jorge.
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to