Hi everyone!

I'd like to revamp this KIP. I have made some significant changes on the
scope:
- Added `mapRecordValue` to map not only headers, but other record
metadata: topic name, partition, offset, and timestamp into a new type
`RecordValue<V>`.
- Added a serde for `RecordValue` to support stateful operations.
- Added `setRecordHeaders` to apply headers to record crossing the stream.
- Added headers to `To` to update headers via `context.forward(k, v, to)`.

New link:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL

Looking forward to your feedback,

Cheers and stay safe,
Jorge.

On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thanks Sophie! Haven't followed KIP-478 but sounds great.
> I'll be happy to help on that migration to the new PAPI if it's still an
> open issue. We can bump this KIP after that.
>
> Cheers,
> Jorge.
>
> On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
>
>> I *think* that the `To` Matthias was referring to was not KStream#to but
>> the To class
>> which is accepted as a possible parameter of ProcessorContext#forward
>> (correct
>> me if wrong).
>>
>> This was on the old ProcessorContext interface, which has now been
>> replaced with
>> the new api.ProcessorContext in KIP-478. In the new interface we've moved
>> away
>> from the forward signatures that accept a separate key or value or
>> timestamp or To,
>> and wrapped all of these into a single Record class. This new Record class
>> has the
>> headers as a field, so it seems like KIP-478 has happened to solve the
>> lack
>> of support
>> for Headers in the PAPI along the way.
>>
>> This is all somewhat recent, and probably wasn't yet sorted out at the
>> time
>> of Matthias'
>> last reply. But given how this worked out it seems like we can just focus
>> on adding
>> support for Headers in the DSL in this KIP by building off of the
>> groundwork of
>> KIP-478? It doesn't seem necessary to go back and add support for headers
>> in the old
>> PAPI, since this will (or already has?) been deprecated.
>>
>> The one challenge is that this will presumably require that we migrate all
>> DSL operators
>> to the new PAPI before adding header support for those operators. But that
>> definitely
>> sounds achievable here
>>
>> On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
>> quilcate.jo...@gmail.com> wrote:
>>
>> > Hi Matthias,
>> >
>> > Sorry for the late reply.
>> >
>> > I like the proposal. Just to check if I got it right:
>> >
>> > We can extend the `kstream.to()` function to support setting headers.
>> > e.g.:
>> >
>> > ```
>> >     void to(final String topic,
>> >             final Produced<K, V> produced,
>> >             final HeadersExtractor<K, V> headersExtractor);
>> > ```
>> >
>> > where `HeadersExtractor`:
>> >
>> > ```
>> > public interface HeadersExtractor<K, V> {
>> >     Headers extract(final K key, final V value, final RecordContext
>> > recordContext);
>> > }
>> > ```
>> >
>> >  This would require to change `Topology#addSink()` to support this
>> > extractor as well.
>> >
>> > If this is aligned with your proposal, I'm happy to add it to this KIP.
>> >
>> > Cheers,
>> > Jorge.
>> >
>> > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>> >
>> > > Jorge,
>> > >
>> > > thanks a lot for this KIP. Being able to modify headers is a very
>> > > valuable feature.
>> > >
>> > > However, before we actually expose them in the DSL, I am wondering if
>> we
>> > > should improve how headers can be modified in the PAPI? Currently, it
>> is
>> > > possible but very clumsy to work with headers in the Processor API,
>> > > because of two reasons:
>> > >
>> > >  (1) There is no default `Headers` implementation in the public API
>> > >  (2) There is no explicit way to set headers for output records
>> > >
>> > > Currently, the input record headers are copied into the output records
>> > > when `forward()` is called, however, it's not really a deep copy but
>> we
>> > > just copy the reference. This implies that one needs to work with a
>> > > single mutable object that flows through multiple processors making it
>> > > very error prone.
>> > >
>> > > Furthermore, if you want to emit multiple output records, and for
>> > > example want to add two different headers to the output record (based
>> on
>> > > the same input headers), you would need to do something like this:
>> > >
>> > >   Headers h = context.headers();
>> > >   h.add(...);
>> > >   context.forward(...);
>> > >   // remove the header you added for the first output record
>> > >   h.remove(...);
>> > >   h.add(...);
>> > >   context.forward(...);
>> > >
>> > >
>> > > Maybe we could extend `To` to allow passing in a new `Headers` object
>> > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We could
>> either
>> > > add it to your KIP or do a new KIP just for the PAPI.
>> > >
>> > > Thoughts?
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
>> > > > Hi everyone,
>> > > >
>> > > > Bumping this thread to check if there's any feedback.
>> > > >
>> > > > Cheers,
>> > > > Jorge.
>> > > >
>> > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
>> > > > quilcate.jo...@gmail.com> wrote:
>> > > >
>> > > >> Hi everyone,
>> > > >>
>> > > >> I would like to start the discussion for KIP-634:
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
>> > > >>
>> > > >> Looking forward to your feedback.
>> > > >>
>> > > >> Thanks!
>> > > >> Jorge.
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >
>> > >
>> > >
>> >
>>
>

Reply via email to