I *think* that the `To` Matthias was referring to was not KStream#to but
the To class
which is accepted as a possible parameter of ProcessorContext#forward
(correct
me if wrong).

This was on the old ProcessorContext interface, which has now been
replaced with
the new api.ProcessorContext in KIP-478. In the new interface we've moved
away
from the forward signatures that accept a separate key or value or
timestamp or To,
and wrapped all of these into a single Record class. This new Record class
has the
headers as a field, so it seems like KIP-478 has happened to solve the lack
of support
for Headers in the PAPI along the way.

This is all somewhat recent, and probably wasn't yet sorted out at the time
of Matthias'
last reply. But given how this worked out it seems like we can just focus
on adding
support for Headers in the DSL in this KIP by building off of the
groundwork of
KIP-478? It doesn't seem necessary to go back and add support for headers
in the old
PAPI, since this will (or already has?) been deprecated.

The one challenge is that this will presumably require that we migrate all
DSL operators
to the new PAPI before adding header support for those operators. But that
definitely
sounds achievable here

On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Hi Matthias,
>
> Sorry for the late reply.
>
> I like the proposal. Just to check if I got it right:
>
> We can extend the `kstream.to()` function to support setting headers.
> e.g.:
>
> ```
>     void to(final String topic,
>             final Produced<K, V> produced,
>             final HeadersExtractor<K, V> headersExtractor);
> ```
>
> where `HeadersExtractor`:
>
> ```
> public interface HeadersExtractor<K, V> {
>     Headers extract(final K key, final V value, final RecordContext
> recordContext);
> }
> ```
>
>  This would require to change `Topology#addSink()` to support this
> extractor as well.
>
> If this is aligned with your proposal, I'm happy to add it to this KIP.
>
> Cheers,
> Jorge.
>
> On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Jorge,
> >
> > thanks a lot for this KIP. Being able to modify headers is a very
> > valuable feature.
> >
> > However, before we actually expose them in the DSL, I am wondering if we
> > should improve how headers can be modified in the PAPI? Currently, it is
> > possible but very clumsy to work with headers in the Processor API,
> > because of two reasons:
> >
> >  (1) There is no default `Headers` implementation in the public API
> >  (2) There is no explicit way to set headers for output records
> >
> > Currently, the input record headers are copied into the output records
> > when `forward()` is called, however, it's not really a deep copy but we
> > just copy the reference. This implies that one needs to work with a
> > single mutable object that flows through multiple processors making it
> > very error prone.
> >
> > Furthermore, if you want to emit multiple output records, and for
> > example want to add two different headers to the output record (based on
> > the same input headers), you would need to do something like this:
> >
> >   Headers h = context.headers();
> >   h.add(...);
> >   context.forward(...);
> >   // remove the header you added for the first output record
> >   h.remove(...);
> >   h.add(...);
> >   context.forward(...);
> >
> >
> > Maybe we could extend `To` to allow passing in a new `Headers` object
> > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could either
> > add it to your KIP or do a new KIP just for the PAPI.
> >
> > Thoughts?
> >
> >
> > -Matthias
> >
> > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> > > Hi everyone,
> > >
> > > Bumping this thread to check if there's any feedback.
> > >
> > > Cheers,
> > > Jorge.
> > >
> > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
> > > quilcate.jo...@gmail.com> wrote:
> > >
> > >> Hi everyone,
> > >>
> > >> I would like to start the discussion for KIP-634:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> > >>
> > >> Looking forward to your feedback.
> > >>
> > >> Thanks!
> > >> Jorge.
> > >>
> > >>
> > >>
> > >>
> > >
> >
> >
>

Reply via email to