Hi all, With the acceptance of KIP-820 which will enable easier access to the Record's metadata and headers and the potential design of a new version of the DSL, I will set this KIP as inactive/dormant for the time being.
Thanks, everyone for the great discussions! Jorge. On Wed, 16 Feb 2022 at 00:08, Matthias J. Sax <mj...@apache.org> wrote: > Sorry for playing devil's advocate, but do we really think it's a good > way to design the API? To me, if feels like a cumbersome workaround. > > Personally, I believe that we are hitting a point for the DSL that > requires a redesign from scratch. When the DSL was designed 5 years ago, > record timestamp was just newly added (and did not play a significant > role yet), and there was no record headers. That's why we have a > kv-based model with `KStream<K,V>` and `KTable<K,V>` types. > > Given the changes in Kafka (and Kafka Streams) that accumulated over the > last 5 years, it seems that a better API would be to have a > KStream<Record<K,V>>` and `KTable<Record<K,V>>` model. > > I also think that we should clearly separate (modifiable) data (key, > value, timestamp, headers) from (immutable) meta-data (topic name, > partition, offset). And to go one step further, I am not even sure it > meta-data makes much sense for non-source records (e.g., `offset` is > more or less useless after a flatMap(), aggregation() -- similar for > topic and partition after a join()). It would make sense to me, to take > considerations like this into account designing the API. > > I would propose to not move forward with a "hacky design" (sorry for the > strong terminology...) but to starting a DSL 2.0 discussion to come up > with a long term sensible re-design. Of course, it would be a much > larger effort, and might not provide a short term fix. > > A DSL 2.0 would not only allow us to add support for headers, but also > to fix many other issue in the DSL that are not easily fixed without > breaking compatibility (one example is KIP-300: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-300%3A+Add+Windowed+KTable+API+in+StreamsBuilder > ) > > Thoughts? > > > -Matthias > > On 2/15/22 11:55 AM, John Roesler wrote: > > Thanks for the update, Jorge, > > > > I've just looked over the KIP again. Just one more small > > concern: > > > > 5) We can't just change the type of Record#headers() to a > > new fully qualified type. That would be a source- > > incompatible breaking change for users. > > > > Out options are: > > * Deprecate the existing method and create a new one with > > the new type > > * If the existing Headers is "not great but ok", then maybe > > we leave it alone. > > > > Thanks, > > -John > > > > > > On Fri, 2022-02-11 at 20:40 +0000, Jorge Esteban Quilcate > > Otoya wrote: > >> John and team, > >> > >> The following changes have been applied to the KIP following your > feedback: > >> > >> - Leverage `Record<K, V>` instead of introducing a new type > >> (`RecordValue<V>`). > >> - `RecordSerde<K, V>` for stateful operations using `Record<K, V>` as > value. > >> - Extend `Record<K, V>` to: > >> - Implement `RecordMetadata` to expose `topic`, `partition`, and > `offset` > >> - Use `Headers` abstraction introduce on this KIP instead of core one > >> > >> KIP: > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL > >> > >> Looking forward to your feedback. > >> > >> Have a great weekend! > >> > >> On Thu, 10 Feb 2022 at 13:15, Jorge Esteban Quilcate Otoya < > >> quilcate.jo...@gmail.com> wrote: > >> > >>>> What do you think about instead adding topic and > >>> partition to Record? > >>> > >>> This is a very interesting idea. Forgot to consider this addition from > >>> KIP-478. > >>> > >>> `Record` would also require `offset`. Maybe implementing > `RecordMetadata` > >>> and adding these fields as part of the constructor to keep them > immutable > >>> in comparison to the other fields? > >>> It would also need to change `Record`'s headers type to the new one > >>> proposed on this KIP. > >>> > >>> Let me explore this approach in more detail and update the KIP. > >>> > >>>> I find the name "mapRecordValue" to be a bit confusing > >>> because it seems like it would map the value of a record. > >>> What do you think about "mapValueToRecord" instead? > >>> > >>> Agree. It will depend on how we solve 1). If we end up using `Record` > then > >>> `mapValueToRecord` will make even more sense. > >>> > >>>> I agree with adding the serde explicitly. However, it > >>> would be good to state whether and when we'll automatically > >>> wrap a value serde. For example, if the value serde is known > >>> (or if we're using a default serde from the config), will > >>> Streams automatically wrap it downstream of the record- > >>> mapping operator? > >>> > >>> Good point. The goal is as you describe it: only when > `mapValueToRecord` > >>> is called, the Serde will be implicitly applied. > >>> Will make this explicit on the KIP. > >>> > >>> > >>> On Wed, 9 Feb 2022 at 20:05, John Roesler <vvcep...@apache.org> wrote: > >>> > >>>> Hello Jorge, > >>>> > >>>> Thanks for bringing this up again! > >>>> > >>>> I've just read over the current version of the KIP. > >>>> > >>>> 1) I wonder if we really need RecordValue, since we now have > >>>> Record, and they are almost the same, both in API and in > >>>> purpose. What do you think about instead adding topic and > >>>> partition to Record? > >>>> > >>>> 2) I find the name "mapRecordValue" to be a bit confusing > >>>> because it seems like it would map the value of a record. > >>>> What do you think about "mapValueToRecord" instead? > >>>> > >>>> 3) I agree with adding the serde explicitly. However, it > >>>> would be good to state whether and when we'll automatically > >>>> wrap a value serde. For example, if the value serde is known > >>>> (or if we're using a default serde from the config), will > >>>> Streams automatically wrap it downstream of the record- > >>>> mapping operator? > >>>> > >>>> Otherwise, your proposal looks good to me! > >>>> > >>>> Thanks, > >>>> -John > >>>> > >>>> On Tue, 2022-02-08 at 18:06 +0000, Jorge Esteban Quilcate > >>>> Otoya wrote: > >>>>> Hi Dev team, > >>>>> > >>>>> I'd like to revamp the KIP again: > >>>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL > >>>>> > >>>>> - Reference implementation is now using the latest `Processor` API > from > >>>>> KIP-478: https://github.com/apache/kafka/pull/10265/files for both > >>>>> Processors backing changes on the KStream API. > >>>>> - It is proposing to still extend `To` class for backwards > >>>> compatibility. > >>>>> > >>>>> Looking forward to your feedback. > >>>>> > >>>>> Regards, > >>>>> Jorge. > >>>>> > >>>>> On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya < > >>>>> quilcate.jo...@gmail.com> wrote: > >>>>> > >>>>>> Hi everyone! > >>>>>> > >>>>>> I'd like to revamp this KIP. I have made some significant changes on > >>>> the > >>>>>> scope: > >>>>>> - Added `mapRecordValue` to map not only headers, but other record > >>>>>> metadata: topic name, partition, offset, and timestamp into a new > type > >>>>>> `RecordValue<V>`. > >>>>>> - Added a serde for `RecordValue` to support stateful operations. > >>>>>> - Added `setRecordHeaders` to apply headers to record crossing the > >>>> stream. > >>>>>> - Added headers to `To` to update headers via `context.forward(k, v, > >>>> to)`. > >>>>>> > >>>>>> New link: > >>>>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL > >>>>>> > >>>>>> Looking forward to your feedback, > >>>>>> > >>>>>> Cheers and stay safe, > >>>>>> Jorge. > >>>>>> > >>>>>> On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya < > >>>>>> quilcate.jo...@gmail.com> wrote: > >>>>>> > >>>>>>> Thanks Sophie! Haven't followed KIP-478 but sounds great. > >>>>>>> I'll be happy to help on that migration to the new PAPI if it's > >>>> still an > >>>>>>> open issue. We can bump this KIP after that. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Jorge. > >>>>>>> > >>>>>>> On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman < > >>>> sop...@confluent.io> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> I *think* that the `To` Matthias was referring to was not > >>>> KStream#to but > >>>>>>>> the To class > >>>>>>>> which is accepted as a possible parameter of > >>>> ProcessorContext#forward > >>>>>>>> (correct > >>>>>>>> me if wrong). > >>>>>>>> > >>>>>>>> This was on the old ProcessorContext interface, which has now been > >>>>>>>> replaced with > >>>>>>>> the new api.ProcessorContext in KIP-478. In the new interface > >>>> we've moved > >>>>>>>> away > >>>>>>>> from the forward signatures that accept a separate key or value or > >>>>>>>> timestamp or To, > >>>>>>>> and wrapped all of these into a single Record class. This new > >>>> Record > >>>>>>>> class > >>>>>>>> has the > >>>>>>>> headers as a field, so it seems like KIP-478 has happened to > >>>> solve the > >>>>>>>> lack > >>>>>>>> of support > >>>>>>>> for Headers in the PAPI along the way. > >>>>>>>> > >>>>>>>> This is all somewhat recent, and probably wasn't yet sorted out > >>>> at the > >>>>>>>> time > >>>>>>>> of Matthias' > >>>>>>>> last reply. But given how this worked out it seems like we can > >>>> just focus > >>>>>>>> on adding > >>>>>>>> support for Headers in the DSL in this KIP by building off of the > >>>>>>>> groundwork of > >>>>>>>> KIP-478? It doesn't seem necessary to go back and add support for > >>>> headers > >>>>>>>> in the old > >>>>>>>> PAPI, since this will (or already has?) been deprecated. > >>>>>>>> > >>>>>>>> The one challenge is that this will presumably require that we > >>>> migrate > >>>>>>>> all > >>>>>>>> DSL operators > >>>>>>>> to the new PAPI before adding header support for those operators. > >>>> But > >>>>>>>> that > >>>>>>>> definitely > >>>>>>>> sounds achievable here > >>>>>>>> > >>>>>>>> On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya < > >>>>>>>> quilcate.jo...@gmail.com> wrote: > >>>>>>>> > >>>>>>>>> Hi Matthias, > >>>>>>>>> > >>>>>>>>> Sorry for the late reply. > >>>>>>>>> > >>>>>>>>> I like the proposal. Just to check if I got it right: > >>>>>>>>> > >>>>>>>>> We can extend the `kstream.to()` function to support setting > >>>> headers. > >>>>>>>>> e.g.: > >>>>>>>>> > >>>>>>>>> ``` > >>>>>>>>> void to(final String topic, > >>>>>>>>> final Produced<K, V> produced, > >>>>>>>>> final HeadersExtractor<K, V> headersExtractor); > >>>>>>>>> ``` > >>>>>>>>> > >>>>>>>>> where `HeadersExtractor`: > >>>>>>>>> > >>>>>>>>> ``` > >>>>>>>>> public interface HeadersExtractor<K, V> { > >>>>>>>>> Headers extract(final K key, final V value, final > >>>> RecordContext > >>>>>>>>> recordContext); > >>>>>>>>> } > >>>>>>>>> ``` > >>>>>>>>> > >>>>>>>>> This would require to change `Topology#addSink()` to support > >>>> this > >>>>>>>>> extractor as well. > >>>>>>>>> > >>>>>>>>> If this is aligned with your proposal, I'm happy to add it to > >>>> this KIP. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Jorge. > >>>>>>>>> > >>>>>>>>> On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax < > >>>> mj...@apache.org> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Jorge, > >>>>>>>>>> > >>>>>>>>>> thanks a lot for this KIP. Being able to modify headers is a > >>>> very > >>>>>>>>>> valuable feature. > >>>>>>>>>> > >>>>>>>>>> However, before we actually expose them in the DSL, I am > >>>> wondering > >>>>>>>> if we > >>>>>>>>>> should improve how headers can be modified in the PAPI? > >>>> Currently, > >>>>>>>> it is > >>>>>>>>>> possible but very clumsy to work with headers in the > >>>> Processor API, > >>>>>>>>>> because of two reasons: > >>>>>>>>>> > >>>>>>>>>> (1) There is no default `Headers` implementation in the > >>>> public API > >>>>>>>>>> (2) There is no explicit way to set headers for output > >>>> records > >>>>>>>>>> > >>>>>>>>>> Currently, the input record headers are copied into the output > >>>>>>>> records > >>>>>>>>>> when `forward()` is called, however, it's not really a deep > >>>> copy but > >>>>>>>> we > >>>>>>>>>> just copy the reference. This implies that one needs to work > >>>> with a > >>>>>>>>>> single mutable object that flows through multiple processors > >>>> making > >>>>>>>> it > >>>>>>>>>> very error prone. > >>>>>>>>>> > >>>>>>>>>> Furthermore, if you want to emit multiple output records, and > >>>> for > >>>>>>>>>> example want to add two different headers to the output record > >>>>>>>> (based on > >>>>>>>>>> the same input headers), you would need to do something like > >>>> this: > >>>>>>>>>> > >>>>>>>>>> Headers h = context.headers(); > >>>>>>>>>> h.add(...); > >>>>>>>>>> context.forward(...); > >>>>>>>>>> // remove the header you added for the first output record > >>>>>>>>>> h.remove(...); > >>>>>>>>>> h.add(...); > >>>>>>>>>> context.forward(...); > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Maybe we could extend `To` to allow passing in a new > >>>> `Headers` object > >>>>>>>>>> (or an `Iterable<Header>` similar to `ProducerRecord`)? We > >>>> could > >>>>>>>> either > >>>>>>>>>> add it to your KIP or do a new KIP just for the PAPI. > >>>>>>>>>> > >>>>>>>>>> Thoughts? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote: > >>>>>>>>>>> Hi everyone, > >>>>>>>>>>> > >>>>>>>>>>> Bumping this thread to check if there's any feedback. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> Jorge. > >>>>>>>>>>> > >>>>>>>>>>> On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate > >>>> Otoya < > >>>>>>>>>>> quilcate.jo...@gmail.com> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>> > >>>>>>>>>>>> I would like to start the discussion for KIP-634: > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL > >>>>>>>>>>>> > >>>>>>>>>>>> Looking forward to your feedback. > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks! > >>>>>>>>>>>> Jorge. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>> > >>>> > > >