Hi Matthias,

Sorry for the late reply.

I like the proposal. Just to check if I got it right:

We can extend the `kstream.to()` function to support setting headers. e.g.:

```
    void to(final String topic,
            final Produced<K, V> produced,
            final HeadersExtractor<K, V> headersExtractor);
```

where `HeadersExtractor`:

```
public interface HeadersExtractor<K, V> {
    Headers extract(final K key, final V value, final RecordContext
recordContext);
}
```

 This would require to change `Topology#addSink()` to support this
extractor as well.

If this is aligned with your proposal, I'm happy to add it to this KIP.

Cheers,
Jorge.

On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org> wrote:

> Jorge,
>
> thanks a lot for this KIP. Being able to modify headers is a very
> valuable feature.
>
> However, before we actually expose them in the DSL, I am wondering if we
> should improve how headers can be modified in the PAPI? Currently, it is
> possible but very clumsy to work with headers in the Processor API,
> because of two reasons:
>
>  (1) There is no default `Headers` implementation in the public API
>  (2) There is no explicit way to set headers for output records
>
> Currently, the input record headers are copied into the output records
> when `forward()` is called, however, it's not really a deep copy but we
> just copy the reference. This implies that one needs to work with a
> single mutable object that flows through multiple processors making it
> very error prone.
>
> Furthermore, if you want to emit multiple output records, and for
> example want to add two different headers to the output record (based on
> the same input headers), you would need to do something like this:
>
>   Headers h = context.headers();
>   h.add(...);
>   context.forward(...);
>   // remove the header you added for the first output record
>   h.remove(...);
>   h.add(...);
>   context.forward(...);
>
>
> Maybe we could extend `To` to allow passing in a new `Headers` object
> (or an `Iterable<Header>` similar to `ProducerRecord`)? We could either
> add it to your KIP or do a new KIP just for the PAPI.
>
> Thoughts?
>
>
> -Matthias
>
> On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> > Hi everyone,
> >
> > Bumping this thread to check if there's any feedback.
> >
> > Cheers,
> > Jorge.
> >
> > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
> > quilcate.jo...@gmail.com> wrote:
> >
> >> Hi everyone,
> >>
> >> I would like to start the discussion for KIP-634:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> >>
> >> Looking forward to your feedback.
> >>
> >> Thanks!
> >> Jorge.
> >>
> >>
> >>
> >>
> >
>
>

Reply via email to