Hi Matthias and Lucas, Thank you for the feedback. I have updated the KIP based on our discussion. Here are the specific updates and clarifications: 1. Header Size and Format
I have removed the two-byte header size and its associated configuration property. Since we are using varints for the header count, as well as for individual key and value sizes within each header, a global "headers size" field is redundant and could be restrictive. 2. Session Store Interfaces Regarding the inclusion of both ReadOnlySessionStoreWithHeaders and SessionStoreWithHeaders: In KIP-258, no ReadOnly variant was needed for Timestamped stores because of how they were integrated. However, since Kafka Streams currently lacks a TimestampedSessionStore, the interfaces introduced here are modeled after the existing SessionStore and ReadOnlySessionStore for consistency. I am open to removing the ReadOnly variant if it is considered over-engineering; I’ll look to Frank for further input on this specific architectural choice. 3. Versioned Store Hierarchy On the question of why VersionedKeyValueStoreWithHeaders does not extend VersionedKeyValueStore: While we could implement long put(K key, V value, long timestamp), it wasn't immediately clear if the inheritance provided a strong benefit for this specific header-aware use case. I would appreciate your thoughts on whether you see this inheritance as a requirement for API consistency. 4. Upgrade Path and Versioning I have clarified the upgrade path in the KIP. To address Lucas’s point about magic/version bytes: adding a version byte to every record introduces significant storage overhead. Instead, I’ve updated the proposal to follow the KIP-258 approach, utilizing a new Column Family in RocksDB for the header-aware format. This isolates the data and allows for a cleaner migration. Please see the updated "Upgrade Path" subsection for details. 5. StateSerdes Public API While StateSerdes is public, I understand it is rarely used by the developers. However, it remains a helpful utility for power users implementing custom StateStore types for external databases. BTW, if we decide it should be internal, we can deprecate it in this KIP. 6. Performance & Lazy Deserialization Thank you, Lucas, for raising the point about Iterators. I have added a "Performance Considerations" subsection. While the varint-based format requires scanning to find payload offsets, we can implement a "semi-lazy" approach. This allows us to scan varints to determine offsets without the high GC overhead of allocating objects for data that may not be accessed during a range scan. 7. Changelog Interaction To clarify: record headers will be preserved in the changelog topic as native Kafka record headers. Bests, Alieh On Tue, Jan 13, 2026 at 7:10 PM TengYao Chi <[email protected]> wrote: > Hi Matthias, > > From my understanding (please correct me if i am wrong): > KV/Window Stores: Use ValueTimestampHeaders<V> because they store > individual records that always require a timestamp. > > Session Stores: Use AggregationWithHeaders<AGG> because the AGG result is > generic (e.g., Long, String, or POJO), and session boundaries are managed > by the store rather than the value wrapper. > > Additionally, during session merges, the application logic must decide > which headers to persist. This wrapper provides a clean way to store that > custom-processed metadata. The downside is that the user must explicitly > handle how to merge headers. > > I am also wondering if we could just use ValueTimestampHeaders directly to > simplify the API? > > Best, > TengYao > > On 2026/01/12 18:44:54 Alieh Saeedi via dev wrote: > > Hey Chia-Ping, > > > > Thanks for the feedback. > > > > chia_0: the headers must be serialized as a byte array. I updated the KIP > > about that. > > chia_1: this is the reason different store types are introduced. The > newly > > introduced stores keep the headers, while the current ones not. > > chia_2: that’s a valid point. I think it’s better to keep the order > rather > > than change it. > > > > > > Bests, > > Alieh > > > > On Sat, Jan 10, 2026 at 2:21 AM Chia-Ping Tsai <[email protected]> > wrote: > > > > > hi Alieh > > > > > > Thanks for the KIP. This proposal seems to open the door for many > > > interesting use cases. I have a few questions? > > > > > > chia_0: could you clarify the serialization format of headers_bytes? > > > > > > chia_1: how does the state store distinguish between legacy values and > > > new values with headers? Since the new format starts with a 2-bytes > length, > > > is there a risk of ambiguity with existing data? > > > > > > chia_2: does the implementation guarantee that the order of headers is > > > preserved > > > > > > Best, > > > Chia-Ping > > > > > > > > > > > > > Alieh Saeedi via dev <[email protected]> 於 2026年1月10日 清晨6:14 寫道: > > > > > > > > Hi all, > > > > > > > > I’d like to start a discussion on KIP-1271, which proposes allowing > Kafka > > > > Streams state stores to preserve record headers. > > > > This would let header-based metadata like schema IDs, tracing info, > and > > > > feature flags be stored and restored alongside values. > > > > The KIP introduces header-aware store types and a small config to > cap the > > > > size of headers written into state. > > > > Details are in the KIP: > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores > > > > . > > > > I’d appreciate your feedback and questions on the proposal. > > > > > > > > Thanks, > > > > Alieh > > > > > >
