Hi everyone! I'd like to revamp this KIP. I have made some significant changes on the scope: - Added `mapRecordValue` to map not only headers, but other record metadata: topic name, partition, offset, and timestamp into a new type `RecordValue<V>`. - Added a serde for `RecordValue` to support stateful operations. - Added `setRecordHeaders` to apply headers to record crossing the stream. - Added headers to `To` to update headers via `context.forward(k, v, to)`.
New link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL Looking forward to your feedback, Cheers and stay safe, Jorge. On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: > Thanks Sophie! Haven't followed KIP-478 but sounds great. > I'll be happy to help on that migration to the new PAPI if it's still an > open issue. We can bump this KIP after that. > > Cheers, > Jorge. > > On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > >> I *think* that the `To` Matthias was referring to was not KStream#to but >> the To class >> which is accepted as a possible parameter of ProcessorContext#forward >> (correct >> me if wrong). >> >> This was on the old ProcessorContext interface, which has now been >> replaced with >> the new api.ProcessorContext in KIP-478. In the new interface we've moved >> away >> from the forward signatures that accept a separate key or value or >> timestamp or To, >> and wrapped all of these into a single Record class. This new Record class >> has the >> headers as a field, so it seems like KIP-478 has happened to solve the >> lack >> of support >> for Headers in the PAPI along the way. >> >> This is all somewhat recent, and probably wasn't yet sorted out at the >> time >> of Matthias' >> last reply. But given how this worked out it seems like we can just focus >> on adding >> support for Headers in the DSL in this KIP by building off of the >> groundwork of >> KIP-478? It doesn't seem necessary to go back and add support for headers >> in the old >> PAPI, since this will (or already has?) been deprecated. >> >> The one challenge is that this will presumably require that we migrate all >> DSL operators >> to the new PAPI before adding header support for those operators. But that >> definitely >> sounds achievable here >> >> On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya < >> quilcate.jo...@gmail.com> wrote: >> >> > Hi Matthias, >> > >> > Sorry for the late reply. >> > >> > I like the proposal. Just to check if I got it right: >> > >> > We can extend the `kstream.to()` function to support setting headers. >> > e.g.: >> > >> > ``` >> > void to(final String topic, >> > final Produced<K, V> produced, >> > final HeadersExtractor<K, V> headersExtractor); >> > ``` >> > >> > where `HeadersExtractor`: >> > >> > ``` >> > public interface HeadersExtractor<K, V> { >> > Headers extract(final K key, final V value, final RecordContext >> > recordContext); >> > } >> > ``` >> > >> > This would require to change `Topology#addSink()` to support this >> > extractor as well. >> > >> > If this is aligned with your proposal, I'm happy to add it to this KIP. >> > >> > Cheers, >> > Jorge. >> > >> > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org> >> wrote: >> > >> > > Jorge, >> > > >> > > thanks a lot for this KIP. Being able to modify headers is a very >> > > valuable feature. >> > > >> > > However, before we actually expose them in the DSL, I am wondering if >> we >> > > should improve how headers can be modified in the PAPI? Currently, it >> is >> > > possible but very clumsy to work with headers in the Processor API, >> > > because of two reasons: >> > > >> > > (1) There is no default `Headers` implementation in the public API >> > > (2) There is no explicit way to set headers for output records >> > > >> > > Currently, the input record headers are copied into the output records >> > > when `forward()` is called, however, it's not really a deep copy but >> we >> > > just copy the reference. This implies that one needs to work with a >> > > single mutable object that flows through multiple processors making it >> > > very error prone. >> > > >> > > Furthermore, if you want to emit multiple output records, and for >> > > example want to add two different headers to the output record (based >> on >> > > the same input headers), you would need to do something like this: >> > > >> > > Headers h = context.headers(); >> > > h.add(...); >> > > context.forward(...); >> > > // remove the header you added for the first output record >> > > h.remove(...); >> > > h.add(...); >> > > context.forward(...); >> > > >> > > >> > > Maybe we could extend `To` to allow passing in a new `Headers` object >> > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could >> either >> > > add it to your KIP or do a new KIP just for the PAPI. >> > > >> > > Thoughts? >> > > >> > > >> > > -Matthias >> > > >> > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote: >> > > > Hi everyone, >> > > > >> > > > Bumping this thread to check if there's any feedback. >> > > > >> > > > Cheers, >> > > > Jorge. >> > > > >> > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya < >> > > > quilcate.jo...@gmail.com> wrote: >> > > > >> > > >> Hi everyone, >> > > >> >> > > >> I would like to start the discussion for KIP-634: >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL >> > > >> >> > > >> Looking forward to your feedback. >> > > >> >> > > >> Thanks! >> > > >> Jorge. >> > > >> >> > > >> >> > > >> >> > > >> >> > > > >> > > >> > > >> > >> >