Great discussion! Seems we are heading into the right direction.
Thanks for clarifying the open question about the header serialization
format, VersionedRecordWithHeaders, StateSerdes, and upgrade path.
A few follow up questions:
MJS-5: As we are keeping `headers_size` now, I am wondering if there
would be a benefit to change the byte format to the same order as used
in Kafka messages, ie
[payload_size][payload][headers_byte]
The only disadvantage I see would be, that I expect `header_size` to be
smaller than `payload_size` for most cases, so we might need a little
bit more space on average for the var-int encoding. But in both cases,
we would be able to implement lazy deserialization. Not saying we have
to do it this way -- in general I agree there is not much benefit to use
the same order as Kafka messages do as it was already pointed out. Just
wanted to mention it for completeness. Thoughts?
MJS-5-B: One request though: the KIP should explain why we need to add
`header_size` (or `payload_size` in case we really make this change).
Reading the KIP as-is, I would always ask myself why we would need
`header_size` -- so mentioning lazy deserialization explicitly as reason
why we add this field would be great to not puzzle readers about it. --
The KIP mentions lazy-deserialization later in the "Compatibility"
section, but does not make the connection to `header_size` field
explicit in this section either.
MJS-6. For the upgrade path the KIP mentions
Window/Session: Employs a clean break at the segment level—old segments stay
as-is; new segments use the new format.
I am wondering why we do it this way? Did KIP-258 also do this (I cannot
remember). It's an interesting idea. I am just wondering about pros/cons
compared to follow the same dual-cf-accessor path as we do for
non-windowed stores. Also from an implementation POV -- would it be more
or less code to write?
MJS-7. In the "Compatibility" section the KIP states
Backward Compatibility
- Public API: No existing APIs are deprecated. The new header-aware interfaces
and factory methods are additive.
As we deprecate some methods on `StateSerdes` now, this is not correct
any longer and should be updated.
MJS-8: Testing. -- There is no mentioning of system tests. And maybe we
don't need any. But might be good to be explicit. Did KIP-258 add new
system tests?
@TengYao: Yes, your understanding of KS/Windowed vs Session store is
correct. It's really all about the optimization to avoid storing "event
time" for sessions twice, as we know "event time == window-end". That's
why using `ValueTimestampHeaders` for header-session store might not be
ideal, as we would lose this optimization. Introducing
`AggregationWithHeaders` is an attempt to keep this optimization though.
-Matthias
On 1/16/26 9:00 AM, Alieh Saeedi via dev wrote:
Updates to KIP
-
1- A varint header_size field is introduced to enable lazy deserialization
when scanning large ranges.
-
2- The current serialization/deserialization methods in StateSerdes are
marked as deprecated to keep the class concise.
-
3- Note that VersionedKeyValueStoreWithHeaders cannot extend
VersionedKeyValueStore because their methods differ in input and/or output
types. In particular, the VersionedRecord returned by VersionedKeyValueStore
methods is a final class and therefore cannot be subclassed.
Thanks,
Alieh
On Thu, Jan 15, 2026 at 4:46 PM Chia-Ping Tsai <[email protected]> wrote:
chia_03: Regarding the header size, using a Varint is consistent with
Kafka's serialization standards. It avoids the overhead of a large
fixed-size field while still achieving the efficient skipping capability we
want.
chia_04: That makes sense.
Alieh Saeedi via dev <[email protected]> 於 2026年1月15日週四 下午10:59寫道:
Hi Chia-Ping Tsai,
Thanks for the feedback.
chia_03: The difficulty with adding a header length is deciding between a
fixed-size field for all records or a configuration allowing users to
define a maximum. Alternatively, we could consider using a varint for the
header length to remain flexible and space-efficient.
chia_04:
It only makes sense to give the second column family its own RocksDB
config if its access pattern or data characteristics are materially
different.
Here we have the same keys, the
same or very similar read/write patterns (e.g., same get, put, range
queries),
and roughly comparable value sizes (CF2 slightly larger per entry).
Then from RocksDB’s perspective the two CFs behave very similarly:
both are generic key–value blobs, written and read with the same
pattern. Most of the important RocksDB options (compaction style,
write buffer sizes, block cache, bloom filters, etc.) would be tuned
the same way for both.
Do you see huge difference between these two?
Thanks,
Alieh
On Thu, Jan 15, 2026 at 3:03 AM Chia-Ping Tsai <[email protected]>
wrote:
hi
chia_03: should we provide a more effective way to load the value
without
scanning the header bytes? (e.g., by storing the total size of headers)
chia_04: Do we need to allow separate Rocksdb configuration for the new
column family
Best,
Chia-Ping
On 2026/01/09 22:14:18 Alieh Saeedi via dev wrote:
Hi all,
I’d like to start a discussion on KIP-1271, which proposes allowing
Kafka
Streams state stores to preserve record headers.
This would let header-based metadata like schema IDs, tracing info,
and
feature flags be stored and restored alongside values.
The KIP introduces header-aware store types and a small config to cap
the
size of headers written into state.
Details are in the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores
.
I’d appreciate your feedback and questions on the proposal.
Thanks,
Alieh