Hi all,

With the acceptance of KIP-820 which will enable easier access to the
Record's metadata and headers and the potential design of a new version of
the DSL, I will set this KIP as inactive/dormant for the time being.

Thanks, everyone for the great discussions!
Jorge.

On Wed, 16 Feb 2022 at 00:08, Matthias J. Sax <mj...@apache.org> wrote:

> Sorry for playing devil's advocate, but do we really think it's a good
> way to design the API? To me, if feels like a cumbersome workaround.
>
> Personally, I believe that we are hitting a point for the DSL that
> requires a redesign from scratch. When the DSL was designed 5 years ago,
> record timestamp was just newly added (and did not play a significant
> role yet), and there was no record headers. That's why we have a
> kv-based model with `KStream<K,V>` and `KTable<K,V>` types.
>
> Given the changes in Kafka (and Kafka Streams) that accumulated over the
> last 5 years, it seems that a better API would be to have a
> KStream<Record<K,V>>` and `KTable<Record<K,V>>` model.
>
> I also think that we should clearly separate (modifiable) data (key,
> value, timestamp, headers) from (immutable) meta-data (topic name,
> partition, offset). And to go one step further, I am not even sure it
> meta-data makes much sense for non-source records (e.g., `offset` is
> more or less useless after a flatMap(), aggregation() -- similar for
> topic and partition after a join()). It would make sense to me, to take
> considerations like this into account designing the API.
>
> I would propose to not move forward with a "hacky design" (sorry for the
> strong terminology...) but to starting a DSL 2.0 discussion to come up
> with a long term sensible re-design. Of course, it would be a much
> larger effort, and might not provide a short term fix.
>
> A DSL 2.0 would not only allow us to add support for headers, but also
> to fix many other issue in the DSL that are not easily fixed without
> breaking compatibility (one example is KIP-300:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-300%3A+Add+Windowed+KTable+API+in+StreamsBuilder
> )
>
> Thoughts?
>
>
> -Matthias
>
> On 2/15/22 11:55 AM, John Roesler wrote:
> > Thanks for the update, Jorge,
> >
> > I've just looked over the KIP again. Just one more small
> > concern:
> >
> > 5) We can't just change the type of Record#headers() to a
> > new fully qualified type. That would be a source-
> > incompatible breaking change for users.
> >
> > Out options are:
> > * Deprecate the existing method and create a new one with
> > the new type
> > * If the existing Headers is "not great but ok", then maybe
> > we leave it alone.
> >
> > Thanks,
> > -John
> >
> >
> > On Fri, 2022-02-11 at 20:40 +0000, Jorge Esteban Quilcate
> > Otoya wrote:
> >> John and team,
> >>
> >> The following changes have been applied to the KIP following your
> feedback:
> >>
> >> - Leverage `Record<K, V>` instead of introducing a new type
> >> (`RecordValue<V>`).
> >> - `RecordSerde<K, V>` for stateful operations using `Record<K, V>` as
> value.
> >> - Extend `Record<K, V>` to:
> >>    - Implement `RecordMetadata` to expose `topic`, `partition`, and
> `offset`
> >>    - Use `Headers` abstraction introduce on this KIP instead of core one
> >>
> >> KIP:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> >>
> >> Looking forward to your feedback.
> >>
> >> Have a great weekend!
> >>
> >> On Thu, 10 Feb 2022 at 13:15, Jorge Esteban Quilcate Otoya <
> >> quilcate.jo...@gmail.com> wrote:
> >>
> >>>> What do you think about instead adding topic and
> >>> partition to Record?
> >>>
> >>> This is a very interesting idea. Forgot to consider this addition from
> >>> KIP-478.
> >>>
> >>> `Record` would also require `offset`. Maybe implementing
> `RecordMetadata`
> >>> and adding these fields as part of the constructor to keep them
> immutable
> >>> in comparison to the other fields?
> >>> It would also need to change `Record`'s headers type to the new one
> >>> proposed on this KIP.
> >>>
> >>> Let me explore this approach in more detail and update the KIP.
> >>>
> >>>> I find the name "mapRecordValue" to be a bit confusing
> >>>    because it seems like it would map the value of a record.
> >>>    What do you think about "mapValueToRecord" instead?
> >>>
> >>> Agree. It will depend on how we solve 1). If we end up using `Record`
> then
> >>> `mapValueToRecord` will make even more sense.
> >>>
> >>>> I agree with adding the serde explicitly. However, it
> >>> would be good to state whether and when we'll automatically
> >>> wrap a value serde. For example, if the value serde is known
> >>> (or if we're using a default serde from the config), will
> >>> Streams automatically wrap it downstream of the record-
> >>> mapping operator?
> >>>
> >>> Good point. The goal is as you describe it: only when
> `mapValueToRecord`
> >>> is called, the Serde will be implicitly applied.
> >>> Will make this explicit on the KIP.
> >>>
> >>>
> >>> On Wed, 9 Feb 2022 at 20:05, John Roesler <vvcep...@apache.org> wrote:
> >>>
> >>>> Hello Jorge,
> >>>>
> >>>> Thanks for bringing this up again!
> >>>>
> >>>> I've just read over the current version of the KIP.
> >>>>
> >>>> 1) I wonder if we really need RecordValue, since we now have
> >>>> Record, and they are almost the same, both in API and in
> >>>> purpose. What do you think about instead adding topic and
> >>>> partition to Record?
> >>>>
> >>>> 2) I find the name "mapRecordValue" to be a bit confusing
> >>>> because it seems like it would map the value of a record.
> >>>> What do you think about "mapValueToRecord" instead?
> >>>>
> >>>> 3) I agree with adding the serde explicitly. However, it
> >>>> would be good to state whether and when we'll automatically
> >>>> wrap a value serde. For example, if the value serde is known
> >>>> (or if we're using a default serde from the config), will
> >>>> Streams automatically wrap it downstream of the record-
> >>>> mapping operator?
> >>>>
> >>>> Otherwise, your proposal looks good to me!
> >>>>
> >>>> Thanks,
> >>>> -John
> >>>>
> >>>> On Tue, 2022-02-08 at 18:06 +0000, Jorge Esteban Quilcate
> >>>> Otoya wrote:
> >>>>> Hi Dev team,
> >>>>>
> >>>>> I'd like to revamp the KIP again:
> >>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> >>>>>
> >>>>> - Reference implementation is now using the latest `Processor` API
> from
> >>>>> KIP-478: https://github.com/apache/kafka/pull/10265/files for both
> >>>>> Processors backing changes on the KStream API.
> >>>>> - It is proposing to still extend `To` class for backwards
> >>>> compatibility.
> >>>>>
> >>>>> Looking forward to your feedback.
> >>>>>
> >>>>> Regards,
> >>>>> Jorge.
> >>>>>
> >>>>> On Thu, 4 Mar 2021 at 18:38, Jorge Esteban Quilcate Otoya <
> >>>>> quilcate.jo...@gmail.com> wrote:
> >>>>>
> >>>>>> Hi everyone!
> >>>>>>
> >>>>>> I'd like to revamp this KIP. I have made some significant changes on
> >>>> the
> >>>>>> scope:
> >>>>>> - Added `mapRecordValue` to map not only headers, but other record
> >>>>>> metadata: topic name, partition, offset, and timestamp into a new
> type
> >>>>>> `RecordValue<V>`.
> >>>>>> - Added a serde for `RecordValue` to support stateful operations.
> >>>>>> - Added `setRecordHeaders` to apply headers to record crossing the
> >>>> stream.
> >>>>>> - Added headers to `To` to update headers via `context.forward(k, v,
> >>>> to)`.
> >>>>>>
> >>>>>> New link:
> >>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+and+record+metadata+in+Kafka+Streams+DSL
> >>>>>>
> >>>>>> Looking forward to your feedback,
> >>>>>>
> >>>>>> Cheers and stay safe,
> >>>>>> Jorge.
> >>>>>>
> >>>>>> On Thu, Oct 29, 2020 at 12:33 AM Jorge Esteban Quilcate Otoya <
> >>>>>> quilcate.jo...@gmail.com> wrote:
> >>>>>>
> >>>>>>> Thanks Sophie! Haven't followed KIP-478 but sounds great.
> >>>>>>> I'll be happy to help on that migration to the new PAPI if it's
> >>>> still an
> >>>>>>> open issue. We can bump this KIP after that.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Jorge.
> >>>>>>>
> >>>>>>> On Wed, Oct 28, 2020 at 7:00 PM Sophie Blee-Goldman <
> >>>> sop...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> I *think* that the `To` Matthias was referring to was not
> >>>> KStream#to but
> >>>>>>>> the To class
> >>>>>>>> which is accepted as a possible parameter of
> >>>> ProcessorContext#forward
> >>>>>>>> (correct
> >>>>>>>> me if wrong).
> >>>>>>>>
> >>>>>>>> This was on the old ProcessorContext interface, which has now been
> >>>>>>>> replaced with
> >>>>>>>> the new api.ProcessorContext in KIP-478. In the new interface
> >>>> we've moved
> >>>>>>>> away
> >>>>>>>> from the forward signatures that accept a separate key or value or
> >>>>>>>> timestamp or To,
> >>>>>>>> and wrapped all of these into a single Record class. This new
> >>>> Record
> >>>>>>>> class
> >>>>>>>> has the
> >>>>>>>> headers as a field, so it seems like KIP-478 has happened to
> >>>> solve the
> >>>>>>>> lack
> >>>>>>>> of support
> >>>>>>>> for Headers in the PAPI along the way.
> >>>>>>>>
> >>>>>>>> This is all somewhat recent, and probably wasn't yet sorted out
> >>>> at the
> >>>>>>>> time
> >>>>>>>> of Matthias'
> >>>>>>>> last reply. But given how this worked out it seems like we can
> >>>> just focus
> >>>>>>>> on adding
> >>>>>>>> support for Headers in the DSL in this KIP by building off of the
> >>>>>>>> groundwork of
> >>>>>>>> KIP-478? It doesn't seem necessary to go back and add support for
> >>>> headers
> >>>>>>>> in the old
> >>>>>>>> PAPI, since this will (or already has?) been deprecated.
> >>>>>>>>
> >>>>>>>> The one challenge is that this will presumably require that we
> >>>> migrate
> >>>>>>>> all
> >>>>>>>> DSL operators
> >>>>>>>> to the new PAPI before adding header support for those operators.
> >>>> But
> >>>>>>>> that
> >>>>>>>> definitely
> >>>>>>>> sounds achievable here
> >>>>>>>>
> >>>>>>>> On Wed, Oct 28, 2020 at 11:10 AM Jorge Esteban Quilcate Otoya <
> >>>>>>>> quilcate.jo...@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Matthias,
> >>>>>>>>>
> >>>>>>>>> Sorry for the late reply.
> >>>>>>>>>
> >>>>>>>>> I like the proposal. Just to check if I got it right:
> >>>>>>>>>
> >>>>>>>>> We can extend the `kstream.to()` function to support setting
> >>>> headers.
> >>>>>>>>> e.g.:
> >>>>>>>>>
> >>>>>>>>> ```
> >>>>>>>>>      void to(final String topic,
> >>>>>>>>>              final Produced<K, V> produced,
> >>>>>>>>>              final HeadersExtractor<K, V> headersExtractor);
> >>>>>>>>> ```
> >>>>>>>>>
> >>>>>>>>> where `HeadersExtractor`:
> >>>>>>>>>
> >>>>>>>>> ```
> >>>>>>>>> public interface HeadersExtractor<K, V> {
> >>>>>>>>>      Headers extract(final K key, final V value, final
> >>>> RecordContext
> >>>>>>>>> recordContext);
> >>>>>>>>> }
> >>>>>>>>> ```
> >>>>>>>>>
> >>>>>>>>>   This would require to change `Topology#addSink()` to support
> >>>> this
> >>>>>>>>> extractor as well.
> >>>>>>>>>
> >>>>>>>>> If this is aligned with your proposal, I'm happy to add it to
> >>>> this KIP.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Jorge.
> >>>>>>>>>
> >>>>>>>>> On Fri, Sep 11, 2020 at 11:03 PM Matthias J. Sax <
> >>>> mj...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Jorge,
> >>>>>>>>>>
> >>>>>>>>>> thanks a lot for this KIP. Being able to modify headers is a
> >>>> very
> >>>>>>>>>> valuable feature.
> >>>>>>>>>>
> >>>>>>>>>> However, before we actually expose them in the DSL, I am
> >>>> wondering
> >>>>>>>> if we
> >>>>>>>>>> should improve how headers can be modified in the PAPI?
> >>>> Currently,
> >>>>>>>> it is
> >>>>>>>>>> possible but very clumsy to work with headers in the
> >>>> Processor API,
> >>>>>>>>>> because of two reasons:
> >>>>>>>>>>
> >>>>>>>>>>   (1) There is no default `Headers` implementation in the
> >>>> public API
> >>>>>>>>>>   (2) There is no explicit way to set headers for output
> >>>> records
> >>>>>>>>>>
> >>>>>>>>>> Currently, the input record headers are copied into the output
> >>>>>>>> records
> >>>>>>>>>> when `forward()` is called, however, it's not really a deep
> >>>> copy but
> >>>>>>>> we
> >>>>>>>>>> just copy the reference. This implies that one needs to work
> >>>> with a
> >>>>>>>>>> single mutable object that flows through multiple processors
> >>>> making
> >>>>>>>> it
> >>>>>>>>>> very error prone.
> >>>>>>>>>>
> >>>>>>>>>> Furthermore, if you want to emit multiple output records, and
> >>>> for
> >>>>>>>>>> example want to add two different headers to the output record
> >>>>>>>> (based on
> >>>>>>>>>> the same input headers), you would need to do something like
> >>>> this:
> >>>>>>>>>>
> >>>>>>>>>>    Headers h = context.headers();
> >>>>>>>>>>    h.add(...);
> >>>>>>>>>>    context.forward(...);
> >>>>>>>>>>    // remove the header you added for the first output record
> >>>>>>>>>>    h.remove(...);
> >>>>>>>>>>    h.add(...);
> >>>>>>>>>>    context.forward(...);
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Maybe we could extend `To` to allow passing in a new
> >>>> `Headers` object
> >>>>>>>>>> (or an `Iterable<Header>` similar to `ProducerRecord`)? We
> >>>> could
> >>>>>>>> either
> >>>>>>>>>> add it to your KIP or do a new KIP just for the PAPI.
> >>>>>>>>>>
> >>>>>>>>>> Thoughts?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> >>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>
> >>>>>>>>>>> Bumping this thread to check if there's any feedback.
> >>>>>>>>>>>
> >>>>>>>>>>> Cheers,
> >>>>>>>>>>> Jorge.
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate
> >>>> Otoya <
> >>>>>>>>>>> quilcate.jo...@gmail.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I would like to start the discussion for KIP-634:
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
> >>>>>>>>>>>>
> >>>>>>>>>>>> Looking forward to your feedback.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks!
> >>>>>>>>>>>> Jorge.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >>>>
> >
>

Reply via email to