Thanks Sophie! Haven't followed KIP-478 but sounds great. I'll be happy to help on that migration to the new PAPI if it's still an open issue. We can bump this KIP after that.
Cheers, Jorge. On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > I *think* that the `To` Matthias was referring to was not KStream#to but > the To class > which is accepted as a possible parameter of ProcessorContext#forward > (correct > me if wrong). > > This was on the old ProcessorContext interface, which has now been > replaced with > the new api.ProcessorContext in KIP-478. In the new interface we've moved > away > from the forward signatures that accept a separate key or value or > timestamp or To, > and wrapped all of these into a single Record class. This new Record class > has the > headers as a field, so it seems like KIP-478 has happened to solve the lack > of support > for Headers in the PAPI along the way. > > This is all somewhat recent, and probably wasn't yet sorted out at the time > of Matthias' > last reply. But given how this worked out it seems like we can just focus > on adding > support for Headers in the DSL in this KIP by building off of the > groundwork of > KIP-478? It doesn't seem necessary to go back and add support for headers > in the old > PAPI, since this will (or already has?) been deprecated. > > The one challenge is that this will presumably require that we migrate all > DSL operators > to the new PAPI before adding header support for those operators. But that > definitely > sounds achievable here > > On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya < > quilcate.jo...@gmail.com> wrote: > > > Hi Matthias, > > > > Sorry for the late reply. > > > > I like the proposal. Just to check if I got it right: > > > > We can extend the `kstream.to()` function to support setting headers. > > e.g.: > > > > ``` > > void to(final String topic, > > final Produced<K, V> produced, > > final HeadersExtractor<K, V> headersExtractor); > > ``` > > > > where `HeadersExtractor`: > > > > ``` > > public interface HeadersExtractor<K, V> { > > Headers extract(final K key, final V value, final RecordContext > > recordContext); > > } > > ``` > > > > This would require to change `Topology#addSink()` to support this > > extractor as well. > > > > If this is aligned with your proposal, I'm happy to add it to this KIP. > > > > Cheers, > > Jorge. > > > > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org> > wrote: > > > > > Jorge, > > > > > > thanks a lot for this KIP. Being able to modify headers is a very > > > valuable feature. > > > > > > However, before we actually expose them in the DSL, I am wondering if > we > > > should improve how headers can be modified in the PAPI? Currently, it > is > > > possible but very clumsy to work with headers in the Processor API, > > > because of two reasons: > > > > > > (1) There is no default `Headers` implementation in the public API > > > (2) There is no explicit way to set headers for output records > > > > > > Currently, the input record headers are copied into the output records > > > when `forward()` is called, however, it's not really a deep copy but we > > > just copy the reference. This implies that one needs to work with a > > > single mutable object that flows through multiple processors making it > > > very error prone. > > > > > > Furthermore, if you want to emit multiple output records, and for > > > example want to add two different headers to the output record (based > on > > > the same input headers), you would need to do something like this: > > > > > > Headers h = context.headers(); > > > h.add(...); > > > context.forward(...); > > > // remove the header you added for the first output record > > > h.remove(...); > > > h.add(...); > > > context.forward(...); > > > > > > > > > Maybe we could extend `To` to allow passing in a new `Headers` object > > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could either > > > add it to your KIP or do a new KIP just for the PAPI. > > > > > > Thoughts? > > > > > > > > > -Matthias > > > > > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote: > > > > Hi everyone, > > > > > > > > Bumping this thread to check if there's any feedback. > > > > > > > > Cheers, > > > > Jorge. > > > > > > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya < > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > >> Hi everyone, > > > >> > > > >> I would like to start the discussion for KIP-634: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL > > > >> > > > >> Looking forward to your feedback. > > > >> > > > >> Thanks! > > > >> Jorge. > > > >> > > > >> > > > >> > > > >> > > > > > > > > > > > > >