Hello Jorge, > Agree. Probably point 3 handles this. `Headers` been part of `RecordContext` would be handled the same way as other attributes.
Today we do not have a clear inheritance protocol for other fields of RecordContext yet: although internally we do have some criterion on topic/partition/offset and timestamp, they are not explicitly exposed to users. I think we still need to have a defined protocol for headers itself, but I agree that it better to be scoped out side of this KIP, since this inheritance protocol itself for all the fields of RecordContext would better be a separate KIP. We can document this clearly in the wiki page. Guozhang On Fri, May 4, 2018 at 5:26 AM, Florian Garcia < garcia.florian.pe...@gmail.com> wrote: > Hi, > > For me this is a great first step to have Headers in streaming. > My current use case is about distributed tracing (Zipkin) and with the > headers in the processorContext() I'll be able to manage that for the most > cases. > The KIP-159 should follow after this but this is where all the major > questions will arise for stateful operations (as Guozhang said). > > Thanks for the work on this Jorge. > > Le ven. 4 mai 2018 à 01:04, Jorge Esteban Quilcate Otoya < > quilcate.jo...@gmail.com> a écrit : > > > Thanks Guozhang and John for your feedback. > > > > > 1. We need to have a clear inheritance protocol of headers in our > > topology: > > > 1.a. In PAPI's context.forward() call, it should be straight-forward. > > > 1.b. In DSL stateless operators, it should be straight-forward. > > > 1.c. What about in stateful operators like aggregates and joins? > > > > Agree. Probably point 3 handles this. `Headers` been part of > > `RecordContext` would be handled the same way as other attributes. > > > > > 3. In future work "Adding DSL Processors to use Headers to > > filter/map/branch", > > it may well be covered in KIP-159; worth taking a look at that KIP. > > > > Yes, I will point to it. > > > > > 2. In terms of internal implementations, should the state store > > cache include the headers then in order to be sent downstreams? > > > > Good question. As `LRUCacheEntry` extends `RecordContext`, I thinks this > is > > already supported. I will detail this on the KIP. > > > > > 4. MINOR: "void process(K key, V value, Headers headers)", this should > be > > removed? > > > > Fixed, thanks. > > > > > 5. MINOR: it seems to be the case that in this KIP, our scope is only > > for exposing > > the headers for reading, and not allowing users to add / modify headers, > > right? If yes, we'd better state it clearly at the "Proposed Changes" > > section. > > > > As headers is exposed in the `ProcessContext`, and headers will be send > > downstream, it can be mutated (add/remove headers). > > > > > Also, despite the decreased scope in this KIP, I think it might be > > valuable to define what will happen to headers once this change is > > implemented. For example, I think a minimal groundwork-level change might > > be to make the API changes, while promising to drop all headers from > input > > records. > > > > I will suggest to pass headers to downstream nodes, and don't drop yhrm. > > Clients will have to drop `Headers` if they have used them. > > Or it could be something like a boolean config property that manage this. > > I would like to hear feedback here. > > > > > A maximal groundwork change would be to forward the headers through all > > operators > > in > > > > Streams. But I think there are some unresolved questions about > forwarding, > > like "what happens to the headers in a join?" > > Probably this would be solve once KIP-159 is implemented and supporting > > Headers. > > > > > There's of course some middle ground, but instinctively, I think I'd > > prefer to have a clear definition that headers are currently *not* > > forwarded, rather than having a complex list of operators that do or > don't > > forward them. Plus, I think it might be tricky to define this behavior > > while not allowing the scope to return to that of your original proposal! > > > > Agree. But `Headers` were forwarded *explicitly* in the original > proposal. > > The current one pass it as part of `RecordContext`, so if it's forward it > > or not is as the same as `RecordContext`. > > On top of this implementation, we can design how filter/map/join will be > > handled. Probably following KIP-159 approach. > > > > Cheers, > > Jorge. > > > > El mié., 2 may. 2018 a las 22:56, Guozhang Wang (<wangg...@gmail.com>) > > escribió: > > > > > Hi Jorge, > > > > > > Thanks for the written KIP! Made a pass over it and left some comments > > > (some of them overlapped with John's): > > > > > > 1. We need to have a clear inheritance protocol of headers in our > > topology: > > > > > > 1.a. In PAPI's context.forward() call, it should be straight-forward. > > > 1.b. In DSL stateless operators, it should be straight-forward. > > > 1.c. What about in stateful operators like aggregates and joins? > > > > > > 2. In terms of internal implementations, should the state store cache > > > include the headers then in order to be sent downstreams? > > > > > > 3. In future work "Adding DSL Processors to use Headers to > > > filter/map/branch", it may well be covered in KIP-159; worth taking a > > look > > > at that KIP. > > > > > > 4. MINOR: "void process(K key, V value, Headers headers)", this should > be > > > removed? > > > > > > 5. MINOR: it seems to be the case that in this KIP, our scope is only > for > > > exposing the headers for reading, and not allowing users to add / > modify > > > headers, right? If yes, we'd better state it clearly at the "Proposed > > > Changes" section. > > > > > > > > > Guozhang > > > > > > > > > On Wed, May 2, 2018 at 8:42 AM, John Roesler <j...@confluent.io> > wrote: > > > > > > > Hi Jorge, > > > > > > > > Thanks for the design work. > > > > > > > > I agree that de-scoping the work to just the Processor API will help > > > > contain the design and implementation complexity. > > > > > > > > In the KIP, it mentions that the headers would be available in the > > > > ProcessorContext, (like "context.headers()"). It also says that > > > > implementers would need to implement the method "void process(K key, > V > > > > value, Headers headers);". I think maybe you meant to remove the > > proposal > > > > to modify "process", since it wouldn't be necessary in conjunction > with > > > the > > > > ProcessorContext change, and it's not represented in your PR. > > > > > > > > Also, despite the decreased scope in this KIP, I think it might be > > > valuable > > > > to define what will happen to headers once this change is > implemented. > > > For > > > > example, I think a minimal groundwork-level change might be to make > the > > > API > > > > changes, while promising to drop all headers from input records. > > > > > > > > A maximal groundwork change would be to forward the headers through > all > > > > operators in Streams. But I think there are some unresolved questions > > > about > > > > forwarding, like "what happens to the headers in a join?" > > > > > > > > There's of course some middle ground, but instinctively, I think I'd > > > prefer > > > > to have a clear definition that headers are currently *not* > forwarded, > > > > rather than having a complex list of operators that do or don't > forward > > > > them. Plus, I think it might be tricky to define this behavior while > > not > > > > allowing the scope to return to that of your original proposal! > > > > > > > > Thanks again for the KIP, > > > > -John > > > > > > > > On Wed, May 2, 2018 at 8:05 AM, Jorge Esteban Quilcate Otoya < > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > > > Hi Matthias, > > > > > > > > > > I've created a new JIRA to track this, updated the KIP and create a > > PR. > > > > > > > > > > Looking forward to your feedback, > > > > > > > > > > Jorge. > > > > > > > > > > El mar., 13 feb. 2018 a las 22:43, Matthias J. Sax (< > > > > matth...@confluent.io > > > > > >) > > > > > escribió: > > > > > > > > > > > Hi Jorge, > > > > > > > > > > > > I would like to unblock this KIP to make some progress. The > tricky > > > > > > question of this work, seems to be how to expose headers at DSL > > > level. > > > > > > This related to KIP-149 and KIP-159. However, for Processor API, > it > > > > > > seems to be rather straight forward to add headers to the API. > > > > > > > > > > > > Thus, I would suggest to de-scope this KIP and add header support > > for > > > > > > Processor API only as a first step. If this is done, we can see > in > > a > > > > > > second step, how to add headers at DSL level. > > > > > > > > > > > > WDYT about this proposal? > > > > > > > > > > > > If you agree, please update the JIRA and KIP accordingly. Note, > > that > > > we > > > > > > have two JIRA that are duplicates atm. We can scope them > > accordingly: > > > > > > one for PAPI only, and second as a dependent JIRA for DSL. > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 12/30/17 3:11 PM, Jorge Esteban Quilcate Otoya wrote: > > > > > > > Thanks for your feedback! > > > > > > > > > > > > > > 1. I was adding headers to KeyValue to support groupBy, but I > > think > > > > it > > > > > is > > > > > > > not necessary. It should be enough with mapping headers to > > > key/value > > > > > and > > > > > > > then group using current KeyValue structure. > > > > > > > > > > > > > > 2. Yes. IMO key/value stores, like RocksDB, rely on KV as > > > structure, > > > > > > hence > > > > > > > considering headers as part of stateful operations will not fit > > in > > > > this > > > > > > > approach and increase complexity (I cannot think in a use-case > > that > > > > > need > > > > > > > this). > > > > > > > > > > > > > > 3. and 4. Changes on 1. will solve this issue. > > > > > > > > > > > > > > Probably I rush a bit proposing this change, I was not aware of > > > > KIP-159 > > > > > > or > > > > > > > KAFKA-5632. > > > > > > > If KIP-159 is adopted and we reduce this KIP to add Headers to > > > > > > > RecordContext will be enough, but I'm not sure about the scope > of > > > > > > KIP-159. > > > > > > > If it includes stateful operations will be difficult to > > implemented > > > > as > > > > > > > stated in 2. > > > > > > > > > > > > > > Cheers, > > > > > > > Jorge. > > > > > > > > > > > > > > El mar., 26 dic. 2017 a las 20:04, Matthias J. Sax (< > > > > > > matth...@confluent.io>) > > > > > > > escribió: > > > > > > > > > > > > > >> Thanks for the KIP Jorge, > > > > > > >> > > > > > > >> As Bill pointed out already, we should be careful with adding > > new > > > > > > >> overloads as this contradicts the work done via KIP-182. > > > > > > >> > > > > > > >> This KIP also seems to be related to KIP-149 and KIP-159. Are > > you > > > > > aware > > > > > > >> of them? Both have quite long DISCUSS threads, but it might be > > > worth > > > > > > >> browsing through them. > > > > > > >> > > > > > > >> A few further questions: > > > > > > >> > > > > > > >> - why do you want to add the headers to `KeyValue`? I am not > > sure > > > > if > > > > > we > > > > > > >> should consider headers as optional metadata and add it to > > > > > > >> `RecordContext` similar to timestamp, offset, etc. only > > > > > > > > > > > > > > > > > > > > >> - You only include stateless single-record transformations at > > the > > > > DSL > > > > > > >> level. Do you suggest that all other operator just drop > headers > > on > > > > the > > > > > > >> floor? > > > > > > >> > > > > > > >> - Why do you only want to put headers into in-memory and > cache > > > but > > > > > not > > > > > > >> RocksDB store? What do you mean by "pass through"? IMHO, all > > > stores > > > > > > >> should behave the same at DSL level. > > > > > > >> -> if we store the headers in the state stores, what is the > > > > upgrade > > > > > > >> path? > > > > > > >> > > > > > > >> - Why do we need to store record header in state in the first > > > > place, > > > > > if > > > > > > >> we exclude stateful operator at DSL level? > > > > > > >> > > > > > > >> > > > > > > >> What is the motivation for the "border lines" you choose? > > > > > > >> > > > > > > >> > > > > > > >> -Matthias > > > > > > >> > > > > > > >> > > > > > > >> On 12/21/17 8:18 AM, Bill Bejeck wrote: > > > > > > >>> Jorge, > > > > > > >>> > > > > > > >>> Thanks for the KIP, I know this is a feature others in the > > > > community > > > > > > have > > > > > > >>> been interested in getting into Kafka Streams. > > > > > > >>> > > > > > > >>> I took a quick pass over it, and I have one initial question. > > > > > > >>> > > > > > > >>> We recently reduced overloads with KIP-182, and in this KIP > we > > > are > > > > > > >>> increasing them again. > > > > > > >>> > > > > > > >>> I can see from the KIP why they are necessary, but I'm > > wondering > > > if > > > > > > there > > > > > > >>> is something else we can do to cut down on the overloads > > > > > introduced. I > > > > > > >>> don't have any sound suggestions ATM, so I'll have to think > > about > > > > it > > > > > > some > > > > > > >>> more, but I wanted to put the thought out there. > > > > > > >>> > > > > > > >>> Thanks, > > > > > > >>> Bill > > > > > > >>> > > > > > > >>> On Thu, Dec 21, 2017 at 9:06 AM, Jorge Esteban Quilcate > Otoya < > > > > > > >>> quilcate.jo...@gmail.com> wrote: > > > > > > >>> > > > > > > >>>> Hi all, > > > > > > >>>> > > > > > > >>>> I have created a KIP to add Record Headers support to Kafka > > > > Streams > > > > > > API: > > > > > > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > >>>> 244%3A+Add+Record+Header+support+to+Kafka+Streams > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> The main goal is to be able to use headers to filter, map > and > > > > > process > > > > > > >>>> records as streams. Stateful processing (joins, windows) are > > not > > > > > > >>>> considered. > > > > > > >>>> > > > > > > >>>> Proposed changes/Draft: > > > > > > >>>> https://github.com/apache/kafka/compare/trunk...jeqo: > > > > > streams-headers > > > > > > >>>> > > > > > > >>>> Feedback and suggestions are more than welcome. > > > > > > >>>> > > > > > > >>>> Cheers, > > > > > > >>>> > > > > > > >>>> Jorge. > > > > > > >>>> > > > > > > >>> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > -- -- Guozhang