Thanks for the update, Jorge,

I've just looked over the KIP again. Just one more small
concern:

5) We can't just change the type of Record#headers() to a
new fully qualified type. That would be a source-
incompatible breaking change for users.

Out options are:
* Deprecate the existing method and create a new one with
the new type
* If the existing Headers is "not great but ok", then maybe
we leave it alone.

Thanks,
-John


On Fri, 2022-02-11 at 20:40 +0000, Jorge Esteban Quilcate
Otoya wrote:
> John and team,
> 
> The following changes have been applied to the KIP following your feedback:
> 
> - Leverage `Record<K, V>` instead of introducing a new type
> (`RecordValue<V>`).
> - `RecordSerde<K, V>` for stateful operations using `Record<K, V>` as value.
> - Extend `Record<K, V>` to:
>   - Implement `RecordMetadata` to expose `topic`, `partition`, and `offset`
>   - Use `Headers` abstraction introduce on this KIP instead of core one
> 
> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> 
> Looking forward to your feedback.
> 
> Have a great weekend!
> 
> On Thu, 10 Feb 2022 at 13:15, Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
> 
> > > What do you think about instead adding topic and
> > partition to Record?
> > 
> > This is a very interesting idea. Forgot to consider this addition from
> > KIP-478.
> > 
> > `Record` would also require `offset`. Maybe implementing `RecordMetadata`
> > and adding these fields as part of the constructor to keep them immutable
> > in comparison to the other fields?
> > It would also need to change `Record`'s headers type to the new one
> > proposed on this KIP.
> > 
> > Let me explore this approach in more detail and update the KIP.
> > 
> > > I find the name "mapRecordValue" to be a bit confusing
> >   because it seems like it would map the value of a record.
> >   What do you think about "mapValueToRecord" instead?
> > 
> > Agree. It will depend on how we solve 1). If we end up using `Record` then
> > `mapValueToRecord` will make even more sense.
> > 
> > > I agree with adding the serde explicitly. However, it
> > would be good to state whether and when we'll automatically
> > wrap a value serde. For example, if the value serde is known
> > (or if we're using a default serde from the config), will
> > Streams automatically wrap it downstream of the record-
> > mapping operator?
> > 
> > Good point. The goal is as you describe it: only when `mapValueToRecord`
> > is called, the Serde will be implicitly applied.
> > Will make this explicit on the KIP.
> > 
> > 
> > On Wed, 9 Feb 2022 at 20:05, John Roesler <vvcep...@apache.org> wrote:
> > 
> > > Hello Jorge,
> > > 
> > > Thanks for bringing this up again!
> > > 
> > > I've just read over the current version of the KIP.
> > > 
> > > 1) I wonder if we really need RecordValue, since we now have
> > > Record, and they are almost the same, both in API and in
> > > purpose. What do you think about instead adding topic and
> > > partition to Record?
> > > 
> > > 2) I find the name "mapRecordValue" to be a bit confusing
> > > because it seems like it would map the value of a record.
> > > What do you think about "mapValueToRecord" instead?
> > > 
> > > 3) I agree with adding the serde explicitly. However, it
> > > would be good to state whether and when we'll automatically
> > > wrap a value serde. For example, if the value serde is known
> > > (or if we're using a default serde from the config), will
> > > Streams automatically wrap it downstream of the record-
> > > mapping operator?
> > > 
> > > Otherwise, your proposal looks good to me!
> > > 
> > > Thanks,
> > > -John
> > > 
> > > On Tue, 2022-02-08 at 18:06 +0000, Jorge Esteban Quilcate
> > > Otoya wrote:
> > > > Hi Dev team,
> > > > 
> > > > I'd like to revamp the KIP again:
> > > > 
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> > > > 
> > > > - Reference implementation is now using the latest `Processor` API from
> > > > KIP-478: https://github.com/apache/kafka/pull/10265/files for both
> > > > Processors backing changes on the KStream API.
> > > > - It is proposing to still extend `To` class for backwards
> > > compatibility.
> > > > 
> > > > Looking forward to your feedback.
> > > > 
> > > > Regards,
> > > > Jorge.
> > > > 
> > > > On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya <
> > > > quilcate.jo...@gmail.com> wrote:
> > > > 
> > > > > Hi everyone!
> > > > > 
> > > > > I'd like to revamp this KIP. I have made some significant changes on
> > > the
> > > > > scope:
> > > > > - Added `mapRecordValue` to map not only headers, but other record
> > > > > metadata: topic name, partition, offset, and timestamp into a new type
> > > > > `RecordValue<V>`.
> > > > > - Added a serde for `RecordValue` to support stateful operations.
> > > > > - Added `setRecordHeaders` to apply headers to record crossing the
> > > stream.
> > > > > - Added headers to `To` to update headers via `context.forward(k, v,
> > > to)`.
> > > > > 
> > > > > New link:
> > > > > 
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> > > > > 
> > > > > Looking forward to your feedback,
> > > > > 
> > > > > Cheers and stay safe,
> > > > > Jorge.
> > > > > 
> > > > > On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
> > > > > quilcate.jo...@gmail.com> wrote:
> > > > > 
> > > > > > Thanks Sophie! Haven't followed KIP-478 but sounds great.
> > > > > > I'll be happy to help on that migration to the new PAPI if it's
> > > still an
> > > > > > open issue. We can bump this KIP after that.
> > > > > > 
> > > > > > Cheers,
> > > > > > Jorge.
> > > > > > 
> > > > > > On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <
> > > sop...@confluent.io>
> > > > > > wrote:
> > > > > > 
> > > > > > > I *think* that the `To` Matthias was referring to was not
> > > KStream#to but
> > > > > > > the To class
> > > > > > > which is accepted as a possible parameter of
> > > ProcessorContext#forward
> > > > > > > (correct
> > > > > > > me if wrong).
> > > > > > > 
> > > > > > > This was on the old ProcessorContext interface, which has now been
> > > > > > > replaced with
> > > > > > > the new api.ProcessorContext in KIP-478. In the new interface
> > > we've moved
> > > > > > > away
> > > > > > > from the forward signatures that accept a separate key or value or
> > > > > > > timestamp or To,
> > > > > > > and wrapped all of these into a single Record class. This new
> > > Record
> > > > > > > class
> > > > > > > has the
> > > > > > > headers as a field, so it seems like KIP-478 has happened to
> > > solve the
> > > > > > > lack
> > > > > > > of support
> > > > > > > for Headers in the PAPI along the way.
> > > > > > > 
> > > > > > > This is all somewhat recent, and probably wasn't yet sorted out
> > > at the
> > > > > > > time
> > > > > > > of Matthias'
> > > > > > > last reply. But given how this worked out it seems like we can
> > > just focus
> > > > > > > on adding
> > > > > > > support for Headers in the DSL in this KIP by building off of the
> > > > > > > groundwork of
> > > > > > > KIP-478? It doesn't seem necessary to go back and add support for
> > > headers
> > > > > > > in the old
> > > > > > > PAPI, since this will (or already has?) been deprecated.
> > > > > > > 
> > > > > > > The one challenge is that this will presumably require that we
> > > migrate
> > > > > > > all
> > > > > > > DSL operators
> > > > > > > to the new PAPI before adding header support for those operators.
> > > But
> > > > > > > that
> > > > > > > definitely
> > > > > > > sounds achievable here
> > > > > > > 
> > > > > > > On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
> > > > > > > quilcate.jo...@gmail.com> wrote:
> > > > > > > 
> > > > > > > > Hi Matthias,
> > > > > > > > 
> > > > > > > > Sorry for the late reply.
> > > > > > > > 
> > > > > > > > I like the proposal. Just to check if I got it right:
> > > > > > > > 
> > > > > > > > We can extend the `kstream.to()` function to support setting
> > > headers.
> > > > > > > > e.g.:
> > > > > > > > 
> > > > > > > > ```
> > > > > > > >     void to(final String topic,
> > > > > > > >             final Produced<K, V> produced,
> > > > > > > >             final HeadersExtractor<K, V> headersExtractor);
> > > > > > > > ```
> > > > > > > > 
> > > > > > > > where `HeadersExtractor`:
> > > > > > > > 
> > > > > > > > ```
> > > > > > > > public interface HeadersExtractor<K, V> {
> > > > > > > >     Headers extract(final K key, final V value, final
> > > RecordContext
> > > > > > > > recordContext);
> > > > > > > > }
> > > > > > > > ```
> > > > > > > > 
> > > > > > > >  This would require to change `Topology#addSink()` to support
> > > this
> > > > > > > > extractor as well.
> > > > > > > > 
> > > > > > > > If this is aligned with your proposal, I'm happy to add it to
> > > this KIP.
> > > > > > > > 
> > > > > > > > Cheers,
> > > > > > > > Jorge.
> > > > > > > > 
> > > > > > > > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <
> > > mj...@apache.org>
> > > > > > > wrote:
> > > > > > > > 
> > > > > > > > > Jorge,
> > > > > > > > > 
> > > > > > > > > thanks a lot for this KIP. Being able to modify headers is a
> > > very
> > > > > > > > > valuable feature.
> > > > > > > > > 
> > > > > > > > > However, before we actually expose them in the DSL, I am
> > > wondering
> > > > > > > if we
> > > > > > > > > should improve how headers can be modified in the PAPI?
> > > Currently,
> > > > > > > it is
> > > > > > > > > possible but very clumsy to work with headers in the
> > > Processor API,
> > > > > > > > > because of two reasons:
> > > > > > > > > 
> > > > > > > > >  (1) There is no default `Headers` implementation in the
> > > public API
> > > > > > > > >  (2) There is no explicit way to set headers for output
> > > records
> > > > > > > > > 
> > > > > > > > > Currently, the input record headers are copied into the output
> > > > > > > records
> > > > > > > > > when `forward()` is called, however, it's not really a deep
> > > copy but
> > > > > > > we
> > > > > > > > > just copy the reference. This implies that one needs to work
> > > with a
> > > > > > > > > single mutable object that flows through multiple processors
> > > making
> > > > > > > it
> > > > > > > > > very error prone.
> > > > > > > > > 
> > > > > > > > > Furthermore, if you want to emit multiple output records, and
> > > for
> > > > > > > > > example want to add two different headers to the output record
> > > > > > > (based on
> > > > > > > > > the same input headers), you would need to do something like
> > > this:
> > > > > > > > > 
> > > > > > > > >   Headers h = context.headers();
> > > > > > > > >   h.add(...);
> > > > > > > > >   context.forward(...);
> > > > > > > > >   // remove the header you added for the first output record
> > > > > > > > >   h.remove(...);
> > > > > > > > >   h.add(...);
> > > > > > > > >   context.forward(...);
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > Maybe we could extend `To` to allow passing in a new
> > > `Headers` object
> > > > > > > > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We
> > > could
> > > > > > > either
> > > > > > > > > add it to your KIP or do a new KIP just for the PAPI.
> > > > > > > > > 
> > > > > > > > > Thoughts?
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > -Matthias
> > > > > > > > > 
> > > > > > > > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> > > > > > > > > > Hi everyone,
> > > > > > > > > > 
> > > > > > > > > > Bumping this thread to check if there's any feedback.
> > > > > > > > > > 
> > > > > > > > > > Cheers,
> > > > > > > > > > Jorge.
> > > > > > > > > > 
> > > > > > > > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate
> > > Otoya <
> > > > > > > > > > quilcate.jo...@gmail.com> wrote:
> > > > > > > > > > 
> > > > > > > > > > > Hi everyone,
> > > > > > > > > > > 
> > > > > > > > > > > I would like to start the discussion for KIP-634:
> > > > > > > > > 
> > > > > > > > 
> > > > > > > 
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> > > > > > > > > > > 
> > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > 
> > > > > > > > > > > Thanks!
> > > > > > > > > > > Jorge.
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > 
> > > > > > > 
> > > > > > 
> > > 
> > > 

Reply via email to