Thanks for the update, Jorge, I've just looked over the KIP again. Just one more small concern:
5) We can't just change the type of Record#headers() to a new fully qualified type. That would be a source- incompatible breaking change for users. Out options are: * Deprecate the existing method and create a new one with the new type * If the existing Headers is "not great but ok", then maybe we leave it alone. Thanks, -John On Fri, 2022-02-11 at 20:40 +0000, Jorge Esteban Quilcate Otoya wrote: > John and team, > > The following changes have been applied to the KIP following your feedback: > > - Leverage `Record<K, V>` instead of introducing a new type > (`RecordValue<V>`). > - `RecordSerde<K, V>` for stateful operations using `Record<K, V>` as value. > - Extend `Record<K, V>` to: > - Implement `RecordMetadata` to expose `topic`, `partition`, and `offset` > - Use `Headers` abstraction introduce on this KIP instead of core one > > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL > > Looking forward to your feedback. > > Have a great weekend! > > On Thu, 10 Feb 2022 at 13:15, Jorge Esteban Quilcate Otoya < > quilcate.jo...@gmail.com> wrote: > > > > What do you think about instead adding topic and > > partition to Record? > > > > This is a very interesting idea. Forgot to consider this addition from > > KIP-478. > > > > `Record` would also require `offset`. Maybe implementing `RecordMetadata` > > and adding these fields as part of the constructor to keep them immutable > > in comparison to the other fields? > > It would also need to change `Record`'s headers type to the new one > > proposed on this KIP. > > > > Let me explore this approach in more detail and update the KIP. > > > > > I find the name "mapRecordValue" to be a bit confusing > > because it seems like it would map the value of a record. > > What do you think about "mapValueToRecord" instead? > > > > Agree. It will depend on how we solve 1). If we end up using `Record` then > > `mapValueToRecord` will make even more sense. > > > > > I agree with adding the serde explicitly. However, it > > would be good to state whether and when we'll automatically > > wrap a value serde. For example, if the value serde is known > > (or if we're using a default serde from the config), will > > Streams automatically wrap it downstream of the record- > > mapping operator? > > > > Good point. The goal is as you describe it: only when `mapValueToRecord` > > is called, the Serde will be implicitly applied. > > Will make this explicit on the KIP. > > > > > > On Wed, 9 Feb 2022 at 20:05, John Roesler <vvcep...@apache.org> wrote: > > > > > Hello Jorge, > > > > > > Thanks for bringing this up again! > > > > > > I've just read over the current version of the KIP. > > > > > > 1) I wonder if we really need RecordValue, since we now have > > > Record, and they are almost the same, both in API and in > > > purpose. What do you think about instead adding topic and > > > partition to Record? > > > > > > 2) I find the name "mapRecordValue" to be a bit confusing > > > because it seems like it would map the value of a record. > > > What do you think about "mapValueToRecord" instead? > > > > > > 3) I agree with adding the serde explicitly. However, it > > > would be good to state whether and when we'll automatically > > > wrap a value serde. For example, if the value serde is known > > > (or if we're using a default serde from the config), will > > > Streams automatically wrap it downstream of the record- > > > mapping operator? > > > > > > Otherwise, your proposal looks good to me! > > > > > > Thanks, > > > -John > > > > > > On Tue, 2022-02-08 at 18:06 +0000, Jorge Esteban Quilcate > > > Otoya wrote: > > > > Hi Dev team, > > > > > > > > I'd like to revamp the KIP again: > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL > > > > > > > > - Reference implementation is now using the latest `Processor` API from > > > > KIP-478: https://github.com/apache/kafka/pull/10265/files for both > > > > Processors backing changes on the KStream API. > > > > - It is proposing to still extend `To` class for backwards > > > compatibility. > > > > > > > > Looking forward to your feedback. > > > > > > > > Regards, > > > > Jorge. > > > > > > > > On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya < > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > > > Hi everyone! > > > > > > > > > > I'd like to revamp this KIP. I have made some significant changes on > > > the > > > > > scope: > > > > > - Added `mapRecordValue` to map not only headers, but other record > > > > > metadata: topic name, partition, offset, and timestamp into a new type > > > > > `RecordValue<V>`. > > > > > - Added a serde for `RecordValue` to support stateful operations. > > > > > - Added `setRecordHeaders` to apply headers to record crossing the > > > stream. > > > > > - Added headers to `To` to update headers via `context.forward(k, v, > > > to)`. > > > > > > > > > > New link: > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL > > > > > > > > > > Looking forward to your feedback, > > > > > > > > > > Cheers and stay safe, > > > > > Jorge. > > > > > > > > > > On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya < > > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > > > > > Thanks Sophie! Haven't followed KIP-478 but sounds great. > > > > > > I'll be happy to help on that migration to the new PAPI if it's > > > still an > > > > > > open issue. We can bump this KIP after that. > > > > > > > > > > > > Cheers, > > > > > > Jorge. > > > > > > > > > > > > On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman < > > > sop...@confluent.io> > > > > > > wrote: > > > > > > > > > > > > > I *think* that the `To` Matthias was referring to was not > > > KStream#to but > > > > > > > the To class > > > > > > > which is accepted as a possible parameter of > > > ProcessorContext#forward > > > > > > > (correct > > > > > > > me if wrong). > > > > > > > > > > > > > > This was on the old ProcessorContext interface, which has now been > > > > > > > replaced with > > > > > > > the new api.ProcessorContext in KIP-478. In the new interface > > > we've moved > > > > > > > away > > > > > > > from the forward signatures that accept a separate key or value or > > > > > > > timestamp or To, > > > > > > > and wrapped all of these into a single Record class. This new > > > Record > > > > > > > class > > > > > > > has the > > > > > > > headers as a field, so it seems like KIP-478 has happened to > > > solve the > > > > > > > lack > > > > > > > of support > > > > > > > for Headers in the PAPI along the way. > > > > > > > > > > > > > > This is all somewhat recent, and probably wasn't yet sorted out > > > at the > > > > > > > time > > > > > > > of Matthias' > > > > > > > last reply. But given how this worked out it seems like we can > > > just focus > > > > > > > on adding > > > > > > > support for Headers in the DSL in this KIP by building off of the > > > > > > > groundwork of > > > > > > > KIP-478? It doesn't seem necessary to go back and add support for > > > headers > > > > > > > in the old > > > > > > > PAPI, since this will (or already has?) been deprecated. > > > > > > > > > > > > > > The one challenge is that this will presumably require that we > > > migrate > > > > > > > all > > > > > > > DSL operators > > > > > > > to the new PAPI before adding header support for those operators. > > > But > > > > > > > that > > > > > > > definitely > > > > > > > sounds achievable here > > > > > > > > > > > > > > On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya < > > > > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > > > > > > > > > Hi Matthias, > > > > > > > > > > > > > > > > Sorry for the late reply. > > > > > > > > > > > > > > > > I like the proposal. Just to check if I got it right: > > > > > > > > > > > > > > > > We can extend the `kstream.to()` function to support setting > > > headers. > > > > > > > > e.g.: > > > > > > > > > > > > > > > > ``` > > > > > > > > void to(final String topic, > > > > > > > > final Produced<K, V> produced, > > > > > > > > final HeadersExtractor<K, V> headersExtractor); > > > > > > > > ``` > > > > > > > > > > > > > > > > where `HeadersExtractor`: > > > > > > > > > > > > > > > > ``` > > > > > > > > public interface HeadersExtractor<K, V> { > > > > > > > > Headers extract(final K key, final V value, final > > > RecordContext > > > > > > > > recordContext); > > > > > > > > } > > > > > > > > ``` > > > > > > > > > > > > > > > > This would require to change `Topology#addSink()` to support > > > this > > > > > > > > extractor as well. > > > > > > > > > > > > > > > > If this is aligned with your proposal, I'm happy to add it to > > > this KIP. > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Jorge. > > > > > > > > > > > > > > > > On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax < > > > mj...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Jorge, > > > > > > > > > > > > > > > > > > thanks a lot for this KIP. Being able to modify headers is a > > > very > > > > > > > > > valuable feature. > > > > > > > > > > > > > > > > > > However, before we actually expose them in the DSL, I am > > > wondering > > > > > > > if we > > > > > > > > > should improve how headers can be modified in the PAPI? > > > Currently, > > > > > > > it is > > > > > > > > > possible but very clumsy to work with headers in the > > > Processor API, > > > > > > > > > because of two reasons: > > > > > > > > > > > > > > > > > > (1) There is no default `Headers` implementation in the > > > public API > > > > > > > > > (2) There is no explicit way to set headers for output > > > records > > > > > > > > > > > > > > > > > > Currently, the input record headers are copied into the output > > > > > > > records > > > > > > > > > when `forward()` is called, however, it's not really a deep > > > copy but > > > > > > > we > > > > > > > > > just copy the reference. This implies that one needs to work > > > with a > > > > > > > > > single mutable object that flows through multiple processors > > > making > > > > > > > it > > > > > > > > > very error prone. > > > > > > > > > > > > > > > > > > Furthermore, if you want to emit multiple output records, and > > > for > > > > > > > > > example want to add two different headers to the output record > > > > > > > (based on > > > > > > > > > the same input headers), you would need to do something like > > > this: > > > > > > > > > > > > > > > > > > Headers h = context.headers(); > > > > > > > > > h.add(...); > > > > > > > > > context.forward(...); > > > > > > > > > // remove the header you added for the first output record > > > > > > > > > h.remove(...); > > > > > > > > > h.add(...); > > > > > > > > > context.forward(...); > > > > > > > > > > > > > > > > > > > > > > > > > > > Maybe we could extend `To` to allow passing in a new > > > `Headers` object > > > > > > > > > (or an `Iterable<Header>` similar to `ProducerRecord`)? We > > > could > > > > > > > either > > > > > > > > > add it to your KIP or do a new KIP just for the PAPI. > > > > > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > > > On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote: > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > Bumping this thread to check if there's any feedback. > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Jorge. > > > > > > > > > > > > > > > > > > > > On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate > > > Otoya < > > > > > > > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > I would like to start the discussion for KIP-634: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL > > > > > > > > > > > > > > > > > > > > > > Looking forward to your feedback. > > > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > Jorge. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >