Hi Matthias, Sorry for the late reply.
I like the proposal. Just to check if I got it right: We can extend the `kstream.to()` function to support setting headers. e.g.: ``` void to(final String topic, final Produced<K, V> produced, final HeadersExtractor<K, V> headersExtractor); ``` where `HeadersExtractor`: ``` public interface HeadersExtractor<K, V> { Headers extract(final K key, final V value, final RecordContext recordContext); } ``` This would require to change `Topology#addSink()` to support this extractor as well. If this is aligned with your proposal, I'm happy to add it to this KIP. Cheers, Jorge. On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org> wrote: > Jorge, > > thanks a lot for this KIP. Being able to modify headers is a very > valuable feature. > > However, before we actually expose them in the DSL, I am wondering if we > should improve how headers can be modified in the PAPI? Currently, it is > possible but very clumsy to work with headers in the Processor API, > because of two reasons: > > (1) There is no default `Headers` implementation in the public API > (2) There is no explicit way to set headers for output records > > Currently, the input record headers are copied into the output records > when `forward()` is called, however, it's not really a deep copy but we > just copy the reference. This implies that one needs to work with a > single mutable object that flows through multiple processors making it > very error prone. > > Furthermore, if you want to emit multiple output records, and for > example want to add two different headers to the output record (based on > the same input headers), you would need to do something like this: > > Headers h = context.headers(); > h.add(...); > context.forward(...); > // remove the header you added for the first output record > h.remove(...); > h.add(...); > context.forward(...); > > > Maybe we could extend `To` to allow passing in a new `Headers` object > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could either > add it to your KIP or do a new KIP just for the PAPI. > > Thoughts? > > > -Matthias > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote: > > Hi everyone, > > > > Bumping this thread to check if there's any feedback. > > > > Cheers, > > Jorge. > > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya < > > quilcate.jo...@gmail.com> wrote: > > > >> Hi everyone, > >> > >> I would like to start the discussion for KIP-634: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL > >> > >> Looking forward to your feedback. > >> > >> Thanks! > >> Jorge. > >> > >> > >> > >> > > > >