Jorge,

thanks a lot for this KIP. Being able to modify headers is a very
valuable feature.

However, before we actually expose them in the DSL, I am wondering if we
should improve how headers can be modified in the PAPI? Currently, it is
possible but very clumsy to work with headers in the Processor API,
because of two reasons:

 (1) There is no default `Headers` implementation in the public API
 (2) There is no explicit way to set headers for output records

Currently, the input record headers are copied into the output records
when `forward()` is called, however, it's not really a deep copy but we
just copy the reference. This implies that one needs to work with a
single mutable object that flows through multiple processors making it
very error prone.

Furthermore, if you want to emit multiple output records, and for
example want to add two different headers to the output record (based on
the same input headers), you would need to do something like this:

  Headers h = context.headers();
  h.add(...);
  context.forward(...);
  // remove the header you added for the first output record
  h.remove(...);
  h.add(...);
  context.forward(...);


Maybe we could extend `To` to allow passing in a new `Headers` object
(or an `Iterable<Header>` similar to `ProducerRecord`)? We could either
add it to your KIP or do a new KIP just for the PAPI.

Thoughts?


-Matthias

On 7/16/20 4:05 PM, Jorge Esteban Quilcate Otoya wrote:
> Hi everyone,
> 
> Bumping this thread to check if there's any feedback.
> 
> Cheers,
> Jorge.
> 
> On Tue, Jun 30, 2020 at 12:46 AM Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
> 
>> Hi everyone,
>>
>> I would like to start the discussion for 
>> KIP-634:https://cwiki.apache.org/confluence/display/KAFKA/KIP-634%3A+Complementary+support+for+headers+in+Kafka+Streams+DSL
>>
>> Looking forward to your feedback.
>>
>> Thanks!
>> Jorge.
>>
>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to