Hello Jorge,

> Agree. Probably point 3 handles this. `Headers` been part of `RecordContext`
would be handled the same way as other attributes.

Today we do not have a clear inheritance protocol for other fields of
RecordContext yet: although internally we do have some criterion on
topic/partition/offset and timestamp, they are not explicitly exposed to
users.


I think we still need to have a defined protocol for headers itself, but I
agree that it better to be scoped out side of this KIP, since this
inheritance protocol itself for all the fields of RecordContext would
better be a separate KIP. We can document this clearly in the wiki page.

Guozhang


On Fri, May 4, 2018 at 5:26 AM, Florian Garcia <
garcia.florian.pe...@gmail.com> wrote:

> Hi,
>
> For me this is a great first step to have Headers in streaming.
> My current use case is about distributed tracing (Zipkin) and with the
> headers in the processorContext() I'll be able to manage that for the most
> cases.
> The KIP-159 should follow after this but this is where all the major
> questions will arise for stateful operations (as Guozhang said).
>
> Thanks for the work on this Jorge.
>
> Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> a écrit :
>
> > Thanks Guozhang and John for your feedback.
> >
> > > 1. We need to have a clear inheritance protocol of headers in our
> > topology:
> > > 1.a. In PAPI's context.forward() call, it should be straight-forward.
> > > 1.b. In DSL stateless operators, it should be straight-forward.
> > > 1.c. What about in stateful operators like aggregates and joins?
> >
> > Agree. Probably point 3 handles this. `Headers` been part of
> > `RecordContext` would be handled the same way as other attributes.
> >
> > > 3. In future work "Adding DSL Processors to use Headers to
> > filter/map/branch",
> > it may well be covered in KIP-159; worth taking a look at that KIP.
> >
> > Yes, I will point to it.
> >
> > > 2. In terms of internal implementations, should the state store
> > cache include the headers then in order to be sent downstreams?
> >
> > Good question. As `LRUCacheEntry` extends `RecordContext`, I thinks this
> is
> > already supported. I will detail this on the KIP.
> >
> > > 4. MINOR: "void process(K key, V value, Headers headers)", this should
> be
> > removed?
> >
> > Fixed, thanks.
> >
> > > 5. MINOR: it seems to be the case that in this KIP, our scope is only
> > for exposing
> > the headers for reading, and not allowing users to add / modify headers,
> > right? If yes, we'd better state it clearly at the "Proposed Changes"
> > section.
> >
> > As headers is exposed in the `ProcessContext`, and headers will be send
> > downstream, it can be mutated (add/remove headers).
> >
> >  > Also, despite the decreased scope in this KIP, I think it might be
> > valuable to define what will happen to headers once this change is
> > implemented. For example, I think a minimal groundwork-level change might
> > be to make the API changes, while promising to drop all headers from
> input
> > records.
> >
> > I will suggest to pass headers to downstream nodes, and don't drop yhrm.
> > Clients will have to drop `Headers` if they have used them.
> > Or it could be something like a boolean config property that manage this.
> > I would like to hear feedback here.
> >
> > > A maximal groundwork change would be to forward the headers through all
> > operators
> > in
> >
> > Streams. But I think there are some unresolved questions about
> forwarding,
> > like "what happens to the headers in a join?"
> > Probably this would be solve once KIP-159 is implemented and supporting
> > Headers.
> >
> > > There's of course some middle ground, but instinctively, I think I'd
> > prefer to have a clear definition that headers are currently *not*
> > forwarded, rather than having a complex list of operators that do or
> don't
> > forward them. Plus, I think it might be tricky to define this behavior
> > while not allowing the scope to return to that of your original proposal!
> >
> > Agree. But `Headers` were forwarded *explicitly* in the original
> proposal.
> > The current one pass it as part of `RecordContext`, so if it's forward it
> > or not is as the same as `RecordContext`.
> > On top of this implementation, we can design how filter/map/join will be
> > handled. Probably following KIP-159 approach.
> >
> > Cheers,
> > Jorge.
> >
> > El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<wangg...@gmail.com>)
> > escribió:
> >
> > > Hi Jorge,
> > >
> > > Thanks for the written KIP! Made a pass over it and left some comments
> > > (some of them overlapped with John's):
> > >
> > > 1. We need to have a clear inheritance protocol of headers in our
> > topology:
> > >
> > > 1.a. In PAPI's context.forward() call, it should be straight-forward.
> > > 1.b. In DSL stateless operators, it should be straight-forward.
> > > 1.c. What about in stateful operators like aggregates and joins?
> > >
> > > 2. In terms of internal implementations, should the state store cache
> > > include the headers then in order to be sent downstreams?
> > >
> > > 3. In future work "Adding DSL Processors to use Headers to
> > > filter/map/branch", it may well be covered in KIP-159; worth taking a
> > look
> > > at that KIP.
> > >
> > > 4. MINOR: "void process(K key, V value, Headers headers)", this should
> be
> > > removed?
> > >
> > > 5. MINOR: it seems to be the case that in this KIP, our scope is only
> for
> > > exposing the headers for reading, and not allowing users to add /
> modify
> > > headers, right? If yes, we'd better state it clearly at the "Proposed
> > > Changes" section.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, May 2, 2018 at 8:42 AM, John Roesler <j...@confluent.io>
> wrote:
> > >
> > > > Hi Jorge,
> > > >
> > > > Thanks for the design work.
> > > >
> > > > I agree that de-scoping the work to just the Processor API will help
> > > > contain the design and implementation complexity.
> > > >
> > > > In the KIP, it mentions that the headers would be available in the
> > > > ProcessorContext, (like "context.headers()"). It also says that
> > > > implementers would need to implement the method "void process(K key,
> V
> > > > value, Headers headers);". I think maybe you meant to remove the
> > proposal
> > > > to modify "process", since it wouldn't be necessary in conjunction
> with
> > > the
> > > > ProcessorContext change, and it's not represented in your PR.
> > > >
> > > > Also, despite the decreased scope in this KIP, I think it might be
> > > valuable
> > > > to define what will happen to headers once this change is
> implemented.
> > > For
> > > > example, I think a minimal groundwork-level change might be to make
> the
> > > API
> > > > changes, while promising to drop all headers from input records.
> > > >
> > > > A maximal groundwork change would be to forward the headers through
> all
> > > > operators in Streams. But I think there are some unresolved questions
> > > about
> > > > forwarding, like "what happens to the headers in a join?"
> > > >
> > > > There's of course some middle ground, but instinctively, I think I'd
> > > prefer
> > > > to have a clear definition that headers are currently *not*
> forwarded,
> > > > rather than having a complex list of operators that do or don't
> forward
> > > > them. Plus, I think it might be tricky to define this behavior while
> > not
> > > > allowing the scope to return to that of your original proposal!
> > > >
> > > > Thanks again for the KIP,
> > > > -John
> > > >
> > > > On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya <
> > > > quilcate.jo...@gmail.com> wrote:
> > > >
> > > > > Hi Matthias,
> > > > >
> > > > > I've created a new JIRA to track this, updated the KIP and create a
> > PR.
> > > > >
> > > > > Looking forward to your feedback,
> > > > >
> > > > > Jorge.
> > > > >
> > > > > El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (<
> > > > matth...@confluent.io
> > > > > >)
> > > > > escribió:
> > > > >
> > > > > > Hi Jorge,
> > > > > >
> > > > > > I would like to unblock this KIP to make some progress. The
> tricky
> > > > > > question of this work, seems to be how to expose headers at DSL
> > > level.
> > > > > > This related to KIP-149 and KIP-159. However, for Processor API,
> it
> > > > > > seems to be rather straight forward to add headers to the API.
> > > > > >
> > > > > > Thus, I would suggest to de-scope this KIP and add header support
> > for
> > > > > > Processor API only as a first step. If this is done, we can see
> in
> > a
> > > > > > second step, how to add headers at DSL level.
> > > > > >
> > > > > > WDYT about this proposal?
> > > > > >
> > > > > > If you agree, please update the JIRA and KIP accordingly. Note,
> > that
> > > we
> > > > > > have two JIRA that are duplicates atm. We can scope them
> > accordingly:
> > > > > > one for PAPI only, and second as a dependent JIRA for DSL.
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > > On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote:
> > > > > > > Thanks for your feedback!
> > > > > > >
> > > > > > > 1. I was adding headers to KeyValue to support groupBy, but I
> > think
> > > > it
> > > > > is
> > > > > > > not necessary. It should be enough with mapping headers to
> > > key/value
> > > > > and
> > > > > > > then group using current KeyValue structure.
> > > > > > >
> > > > > > > 2. Yes. IMO key/value stores, like RocksDB, rely on KV as
> > > structure,
> > > > > > hence
> > > > > > > considering headers as part of stateful operations will not fit
> > in
> > > > this
> > > > > > > approach and increase complexity (I cannot think in a use-case
> > that
> > > > > need
> > > > > > > this).
> > > > > > >
> > > > > > > 3. and 4. Changes on 1. will solve this issue.
> > > > > > >
> > > > > > > Probably I rush a bit proposing this change, I was not aware of
> > > > KIP-159
> > > > > > or
> > > > > > > KAFKA-5632.
> > > > > > > If KIP-159 is adopted and we reduce this KIP to add Headers to
> > > > > > > RecordContext will be enough, but I'm not sure about the scope
> of
> > > > > > KIP-159.
> > > > > > > If it includes stateful operations will be difficult to
> > implemented
> > > > as
> > > > > > > stated in 2.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Jorge.
> > > > > > >
> > > > > > > El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (<
> > > > > > matth...@confluent.io>)
> > > > > > > escribió:
> > > > > > >
> > > > > > >> Thanks for the KIP Jorge,
> > > > > > >>
> > > > > > >> As Bill pointed out already, we should be careful with adding
> > new
> > > > > > >> overloads as this contradicts the work done via KIP-182.
> > > > > > >>
> > > > > > >> This KIP also seems to be related to KIP-149 and KIP-159. Are
> > you
> > > > > aware
> > > > > > >> of them? Both have quite long DISCUSS threads, but it might be
> > > worth
> > > > > > >> browsing through them.
> > > > > > >>
> > > > > > >> A few further questions:
> > > > > > >>
> > > > > > >>  - why do you want to add the headers to `KeyValue`? I am not
> > sure
> > > > if
> > > > > we
> > > > > > >> should consider headers as optional metadata and add it to
> > > > > > >> `RecordContext` similar to timestamp, offset, etc. only
> > > > > > >
> > > > > > >
> > > > > > >>  - You only include stateless single-record transformations at
> > the
> > > > DSL
> > > > > > >> level. Do you suggest that all other operator just drop
> headers
> > on
> > > > the
> > > > > > >> floor?
> > > > > > >>
> > > > > > >>  - Why do you only want to put headers into in-memory and
> cache
> > > but
> > > > > not
> > > > > > >> RocksDB store? What do you mean by "pass through"? IMHO, all
> > > stores
> > > > > > >> should behave the same at DSL level.
> > > > > > >>    -> if we store the headers in the state stores, what is the
> > > > upgrade
> > > > > > >> path?
> > > > > > >>
> > > > > > >>  - Why do we need to store record header in state in the first
> > > > place,
> > > > > if
> > > > > > >> we exclude stateful operator at DSL level?
> > > > > > >>
> > > > > > >>
> > > > > > >> What is the motivation for the "border lines" you choose?
> > > > > > >>
> > > > > > >>
> > > > > > >> -Matthias
> > > > > > >>
> > > > > > >>
> > > > > > >> On 12/21/17 8:18 AM, Bill Bejeck wrote:
> > > > > > >>> Jorge,
> > > > > > >>>
> > > > > > >>> Thanks for the KIP, I know this is a feature others in the
> > > > community
> > > > > > have
> > > > > > >>> been interested in getting into Kafka Streams.
> > > > > > >>>
> > > > > > >>> I took a quick pass over it, and I have one initial question.
> > > > > > >>>
> > > > > > >>> We recently reduced overloads with KIP-182, and in this KIP
> we
> > > are
> > > > > > >>> increasing them again.
> > > > > > >>>
> > > > > > >>> I can see from the KIP why they are necessary, but I'm
> > wondering
> > > if
> > > > > > there
> > > > > > >>> is something else we can do to cut down on the overloads
> > > > > introduced.  I
> > > > > > >>> don't have any sound suggestions ATM, so I'll have to think
> > about
> > > > it
> > > > > > some
> > > > > > >>> more, but I wanted to put the thought out there.
> > > > > > >>>
> > > > > > >>> Thanks,
> > > > > > >>> Bill
> > > > > > >>>
> > > > > > >>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate
> Otoya <
> > > > > > >>> quilcate.jo...@gmail.com> wrote:
> > > > > > >>>
> > > > > > >>>> Hi all,
> > > > > > >>>>
> > > > > > >>>> I have created a KIP to add Record Headers support to Kafka
> > > > Streams
> > > > > > API:
> > > > > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > >>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>> The main goal is to be able to use headers to filter, map
> and
> > > > > process
> > > > > > >>>> records as streams. Stateful processing (joins, windows) are
> > not
> > > > > > >>>> considered.
> > > > > > >>>>
> > > > > > >>>> Proposed changes/Draft:
> > > > > > >>>> https://github.com/apache/kafka/compare/trunk...jeqo:
> > > > > streams-headers
> > > > > > >>>>
> > > > > > >>>> Feedback and suggestions are more than welcome.
> > > > > > >>>>
> > > > > > >>>> Cheers,
> > > > > > >>>>
> > > > > > >>>> Jorge.
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > > >>
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>



-- 
-- Guozhang

Reply via email to