Hi Alieh,

Thanks for the KIP! It's great to see this feature make its way into Kafka
Streams.

I have one follow-up question on format that Lucas mentioned in LB1.
The Kafka Record format stores headers last, after the value.  But the KIP
specifies this the opposite way headers first then value.
Given this will go in one byte, Is there a reason for changing the format?

Thanks,
Bill


On Wed, Jan 14, 2026 at 1:40 PM Alieh Saeedi via dev <[email protected]>
wrote:

> Hi Matthias and Lucas,
>
> Thank you for the feedback. I have updated the KIP based on our discussion.
> Here are the specific updates and clarifications:
> 1. Header Size and Format
>
> I have removed the two-byte header size and its associated configuration
> property. Since we are using varints for the header count, as well as for
> individual key and value sizes within each header, a global "headers size"
> field is redundant and could be restrictive.
> 2. Session Store Interfaces
>
> Regarding the inclusion of both ReadOnlySessionStoreWithHeaders and
> SessionStoreWithHeaders: In KIP-258, no ReadOnly variant was needed for
> Timestamped stores because of how they were integrated. However, since
> Kafka Streams currently lacks a TimestampedSessionStore, the interfaces
> introduced here are modeled after the existing SessionStore and
> ReadOnlySessionStore for consistency. I am open to removing the ReadOnly
> variant if it is considered over-engineering; I’ll look to Frank for
> further input on this specific architectural choice.
> 3. Versioned Store Hierarchy
>
> On the question of why VersionedKeyValueStoreWithHeaders does not extend
> VersionedKeyValueStore: While we could implement long put(K key, V value,
> long timestamp), it wasn't immediately clear if the inheritance provided a
> strong benefit for this specific header-aware use case. I would appreciate
> your thoughts on whether you see this inheritance as a requirement for API
> consistency.
> 4. Upgrade Path and Versioning
>
> I have clarified the upgrade path in the KIP. To address Lucas’s point
> about magic/version bytes: adding a version byte to every record introduces
> significant storage overhead. Instead, I’ve updated the proposal to follow
> the KIP-258 approach, utilizing a new Column Family in RocksDB for the
> header-aware format. This isolates the data and allows for a cleaner
> migration. Please see the updated "Upgrade Path" subsection for details.
> 5. StateSerdes Public API
>
> While StateSerdes is public, I understand it is rarely used by the
> developers. However, it remains a helpful utility for power users
> implementing custom StateStore types for external databases. BTW, if we
> decide it should be internal, we can deprecate it in this KIP.
> 6. Performance & Lazy Deserialization
>
> Thank you, Lucas, for raising the point about Iterators. I have added a
> "Performance Considerations" subsection. While the varint-based format
> requires scanning to find payload offsets, we can implement a "semi-lazy"
> approach. This allows us to scan varints to determine offsets without the
> high GC overhead of allocating objects for data that may not be accessed
> during a range scan.
> 7. Changelog Interaction
>
> To clarify: record headers will be preserved in the changelog topic as
> native Kafka record headers.
>
> Bests,
> Alieh
>
> On Tue, Jan 13, 2026 at 7:10 PM TengYao Chi <[email protected]> wrote:
>
> > Hi Matthias,
> >
> > From my understanding (please correct me if i am wrong):
> > KV/Window Stores: Use ValueTimestampHeaders<V> because they store
> > individual records that always require a timestamp.
> >
> > Session Stores: Use AggregationWithHeaders<AGG> because the AGG result is
> > generic (e.g., Long, String, or POJO), and session boundaries are managed
> > by the store rather than the value wrapper.
> >
> > Additionally, during session merges, the application logic must decide
> > which headers to persist. This wrapper provides a clean way to store that
> > custom-processed metadata. The downside is that the user must explicitly
> > handle how to merge headers.
> >
> > I am also wondering if we could just use ValueTimestampHeaders directly
> to
> > simplify the API?
> >
> > Best,
> > TengYao
> >
> > On 2026/01/12 18:44:54 Alieh Saeedi via dev wrote:
> > > Hey Chia-Ping,
> > >
> > > Thanks for the feedback.
> > >
> > > chia_0: the headers must be serialized as a byte array. I updated the
> KIP
> > > about that.
> > > chia_1: this is the reason different store types are introduced. The
> > newly
> > > introduced stores keep the headers, while the current ones not.
> > > chia_2: that’s a valid point. I think it’s better to keep the order
> > rather
> > > than change it.
> > >
> > >
> > > Bests,
> > > Alieh
> > >
> > > On Sat, Jan 10, 2026 at 2:21 AM Chia-Ping Tsai <[email protected]>
> > wrote:
> > >
> > > > hi Alieh
> > > >
> > > > Thanks for the KIP. This proposal seems to open the door for many
> > > > interesting use cases. I have a few questions?
> > > >
> > > > chia_0: could you clarify the serialization format of headers_bytes?
> > > >
> > > > chia_1:  how does the state store distinguish between legacy values
> and
> > > > new values with headers? Since the new format starts with a 2-bytes
> > length,
> > > > is there a risk of ambiguity with existing data?
> > > >
> > > > chia_2: does the implementation guarantee that the order of headers
> is
> > > > preserved
> > > >
> > > > Best,
> > > > Chia-Ping
> > > >
> > > >
> > > >
> > > > > Alieh Saeedi via dev <[email protected]> 於 2026年1月10日 清晨6:14
> 寫道:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I’d like to start a discussion on KIP-1271, which proposes allowing
> > Kafka
> > > > > Streams state stores to preserve record headers.
> > > > > This would let header-based metadata like schema IDs, tracing info,
> > and
> > > > > feature flags be stored and restored alongside values.
> > > > > The KIP introduces header-aware store types and a small config to
> > cap the
> > > > > size of headers written into state.
> > > > > Details are in the KIP:
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores
> > > > > .
> > > > > I’d appreciate your feedback and questions on the proposal.
> > > > >
> > > > > Thanks,
> > > > > Alieh
> > > >
> > >
> >
>

Reply via email to