This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch refactor-binary-7-http
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit e409be00a6ccd5604a5d047f9560c5f91212ca09
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Mar 31 12:52:22 2026 +0200

    refactor(server,sdk): add wire validation for user headers and fix encoding 
defects
    
    User headers had several defects: no structural validation on
    server ingest (garbage bytes were stored and only discovered
    at consumer read time as silent Ok(None)), non-deterministic
    encoding from HashMap iteration order affecting the HTTP path,
    wrong error type (IggyError::InvalidCommand for unknown header
    kind), double heap allocation per field in the decoder, and
    zero pre-allocation in the encoder.
    
    Add wire-level TLV validation primitives to binary_protocol:
    WireHeaderKind(u8) newtype (forward-compatible for VSR rolling
    upgrades), validate_user_headers() for structural checks,
    WireUserHeaderIterator for zero-copy iteration, and encode/size
    helpers. The server now validates header structure on ingest
    before persistence.
    
    Migrate UserHeaders from HashMap to BTreeMap for deterministic
    serialization order. Rewrite user_headers_to_bytes with
    pre-allocated buffer via binary_protocol encoder. Rewrite
    user_headers_from_bytes to take &[u8] and use the zero-copy
    iterator, fixing the double .to_vec() allocation bug. Fix
    silent Ok(None) in both user_headers_map() impls to propagate
    errors. Replace IggyError::InvalidCommand with new
    InvalidHeaderKind(u8) = 4038 variant.
---
 Cargo.lock                                         |   2 +
 DEPENDENCIES.md                                    |  86 ++--
 core/binary_protocol/src/lib.rs                    |   4 +
 core/binary_protocol/src/primitives/mod.rs         |   1 +
 .../binary_protocol/src/primitives/user_headers.rs | 544 +++++++++++++++++++++
 core/cli/Cargo.toml                                |   1 +
 .../src/commands/binary_message/poll_messages.rs   |   9 +-
 .../src/commands/binary_message/send_messages.rs   |   4 +-
 core/common/src/error/iggy_error.rs                |   2 +
 core/common/src/http/messages/send_messages.rs     |   4 +-
 core/common/src/types/message/iggy_message.rs      |  72 +--
 core/common/src/types/message/message_view.rs      |  26 +-
 core/common/src/types/message/mod.rs               |   3 +-
 core/common/src/types/message/user_headers.rs      | 170 ++-----
 core/common/src/utils/crypto.rs                    |   4 +-
 core/common/src/wire_conversions.rs                |  75 ++-
 core/connectors/runtime/src/sink.rs                |   2 +-
 core/connectors/runtime/src/source.rs              |   4 +-
 core/connectors/sdk/src/lib.rs                     |  10 +-
 core/connectors/sinks/http_sink/src/lib.rs         |   3 +-
 .../tests/cli/message/test_message_poll_command.rs |   4 +-
 .../message/test_message_poll_to_file_command.rs   |   8 +-
 .../cli/message/test_message_reply_via_file.rs     |   4 +-
 .../tests/cli/message/test_message_send_command.rs |   8 +-
 .../message/test_message_send_from_file_command.rs |   8 +-
 .../server/scenarios/create_message_payload.rs     |   6 +-
 .../tests/server/scenarios/encryption_scenario.rs  |   8 +-
 .../server/scenarios/message_headers_scenario.rs   |   6 +-
 .../server/scenarios/message_size_scenario.rs      |   6 +-
 .../tests/server/scenarios/offset_scenario.rs      |  10 +-
 .../tests/server/scenarios/timestamp_scenario.rs   |  10 +-
 core/sdk/src/prelude.rs                            |   1 -
 core/tools/src/data-seeder/seeder.rs               |   4 +-
 examples/rust/Cargo.toml                           |   1 +
 .../message-compression/consumer/main.rs           |  12 +-
 .../message-compression/producer/main.rs           |   4 +-
 .../message-headers/message-type/producer/main.rs  |   4 +-
 .../message-headers/typed-headers/producer/main.rs |   4 +-
 38 files changed, 848 insertions(+), 286 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 66d8271d2..b5c76c369 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5305,6 +5305,7 @@ dependencies = [
  "dirs",
  "figlet-rs",
  "iggy",
+ "iggy_binary_protocol",
  "iggy_common",
  "keyring",
  "passterm",
@@ -5739,6 +5740,7 @@ dependencies = [
  "clap",
  "futures-util",
  "iggy",
+ "iggy_common",
  "lz4_flex 0.13.0",
  "rand 0.10.0",
  "serde",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 9b019c26b..0135e7ed2 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -80,7 +80,7 @@ av-scenechange: 0.14.1, "MIT",
 av1-grain: 0.2.5, "BSD-2-Clause",
 avif-serialize: 0.8.8, "BSD-3-Clause",
 aws-lc-rs: 1.16.2, "(Apache-2.0 OR ISC) AND ISC",
-aws-lc-sys: 0.39.0, "(Apache-2.0 OR ISC OR MIT) AND (Apache-2.0 OR ISC OR 
MIT-0) AND (Apache-2.0 OR ISC) AND Apache-2.0 AND BSD-3-Clause AND ISC AND MIT",
+aws-lc-sys: 0.39.1, "(Apache-2.0 OR ISC OR MIT) AND (Apache-2.0 OR ISC OR 
MIT-0) AND (Apache-2.0 OR ISC) AND Apache-2.0 AND BSD-3-Clause AND ISC AND MIT",
 axum: 0.8.8, "MIT",
 axum-core: 0.5.6, "MIT",
 axum-macros: 0.5.0, "MIT",
@@ -112,7 +112,7 @@ bitflags: 2.11.0, "Apache-2.0 OR MIT",
 bitstream-io: 4.9.0, "Apache-2.0 OR MIT",
 bitvec: 1.0.1, "MIT",
 blake2: 0.10.6, "Apache-2.0 OR MIT",
-blake3: 1.8.3, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR CC0-1.0",
+blake3: 1.8.4, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR CC0-1.0",
 block-buffer: 0.10.4, "Apache-2.0 OR MIT",
 block2: 0.6.2, "MIT",
 bnum: 0.12.1, "Apache-2.0 OR MIT",
@@ -145,7 +145,7 @@ capacity_builder: 0.5.0, "MIT",
 capacity_builder_macros: 0.3.0, "MIT",
 cargo-platform: 0.3.2, "Apache-2.0 OR MIT",
 cargo_metadata: 0.23.1, "MIT",
-cc: 1.2.57, "Apache-2.0 OR MIT",
+cc: 1.2.58, "Apache-2.0 OR MIT",
 cesu8: 1.1.0, "Apache-2.0 OR MIT",
 cexpr: 0.6.0, "Apache-2.0 OR MIT",
 cfg-if: 1.0.4, "Apache-2.0 OR MIT",
@@ -162,7 +162,7 @@ clap_complete: 4.6.0, "Apache-2.0 OR MIT",
 clap_derive: 4.6.0, "Apache-2.0 OR MIT",
 clap_lex: 1.1.0, "Apache-2.0 OR MIT",
 clock: 0.1.0, "N/A",
-cmake: 0.1.57, "Apache-2.0 OR MIT",
+cmake: 0.1.58, "Apache-2.0 OR MIT",
 cobs: 0.3.0, "Apache-2.0 OR MIT",
 color_quant: 1.1.0, "MIT",
 colorchoice: 1.0.5, "Apache-2.0 OR MIT",
@@ -171,13 +171,13 @@ combine: 4.6.7, "MIT",
 comfy-table: 7.2.2, "MIT",
 compio: 0.18.0, "MIT",
 compio-buf: 0.8.1, "MIT",
-compio-driver: 0.11.3, "MIT",
+compio-driver: 0.11.4, "MIT",
 compio-fs: 0.11.0, "MIT",
 compio-io: 0.9.1, "MIT",
 compio-log: 0.1.0, "MIT",
 compio-macros: 0.1.2, "MIT",
 compio-net: 0.11.1, "MIT",
-compio-quic: 0.7.1, "MIT",
+compio-quic: 0.7.2, "MIT",
 compio-runtime: 0.11.0, "MIT",
 compio-tls: 0.9.0, "MIT",
 compio-ws: 0.3.0, "MIT",
@@ -270,7 +270,7 @@ derive_more: 2.1.1, "MIT",
 derive_more-impl: 2.1.1, "MIT",
 difflib: 0.4.0, "MIT",
 digest: 0.10.7, "Apache-2.0 OR MIT",
-dircpy: 0.3.19, "MIT",
+dircpy: 0.3.20, "MIT",
 dirs: 6.0.0, "Apache-2.0 OR MIT",
 dirs-sys: 0.5.0, "Apache-2.0 OR MIT",
 dispatch2: 0.3.1, "Apache-2.0 OR MIT OR Zlib",
@@ -430,7 +430,7 @@ human-repr: 1.1.0, "MIT",
 humantime: 2.3.0, "Apache-2.0 OR MIT",
 hwlocality: 1.0.0-alpha.11, "MIT",
 hwlocality-sys: 0.6.4, "MIT",
-hyper: 1.8.1, "MIT",
+hyper: 1.9.0, "MIT",
 hyper-named-pipe: 0.1.0, "Apache-2.0",
 hyper-rustls: 0.27.7, "Apache-2.0 OR ISC OR MIT",
 hyper-timeout: 0.5.2, "Apache-2.0 OR MIT",
@@ -493,13 +493,13 @@ inout: 0.1.4, "Apache-2.0 OR MIT",
 integer-encoding: 3.0.4, "MIT",
 integration: 0.0.1, "Apache-2.0",
 interpolate_name: 0.2.4, "MIT",
-inventory: 0.3.22, "Apache-2.0 OR MIT",
+inventory: 0.3.24, "Apache-2.0 OR MIT",
 io-uring: 0.7.11, "Apache-2.0 OR MIT",
 io_uring_buf_ring: 0.2.3, "MIT",
 iobuf: 0.1.0, "Apache-2.0",
 ipconfig: 0.3.4, "Apache-2.0 OR MIT",
 ipnet: 2.12.0, "Apache-2.0 OR MIT",
-iri-string: 0.7.11, "Apache-2.0 OR MIT",
+iri-string: 0.7.12, "Apache-2.0 OR MIT",
 is_terminal_polyfill: 1.70.2, "Apache-2.0 OR MIT",
 itertools: 0.13.0, "Apache-2.0 OR MIT",
 itertools: 0.14.0, "Apache-2.0 OR MIT",
@@ -514,7 +514,7 @@ jni-sys: 0.4.1, "Apache-2.0 OR MIT",
 jni-sys-macros: 0.4.1, "Apache-2.0 OR MIT",
 jobserver: 0.1.34, "Apache-2.0 OR MIT",
 journal: 0.1.0, "Apache-2.0",
-js-sys: 0.3.91, "Apache-2.0 OR MIT",
+js-sys: 0.3.94, "Apache-2.0 OR MIT",
 jsonwebtoken: 10.3.0, "MIT",
 jwalk: 0.8.1, "MIT",
 keccak: 0.1.6, "Apache-2.0 OR MIT",
@@ -536,7 +536,7 @@ lexical-util: 1.0.7, "Apache-2.0 OR MIT",
 lexical-write-float: 1.0.6, "Apache-2.0 OR MIT",
 lexical-write-integer: 1.0.6, "Apache-2.0 OR MIT",
 libbz2-rs-sys: 0.2.2, "bzip2-1.0.6",
-libc: 0.2.183, "Apache-2.0 OR MIT",
+libc: 0.2.184, "Apache-2.0 OR MIT",
 libdbus-sys: 0.2.7, "Apache-2.0 OR MIT",
 libfuzzer-sys: 0.4.12, "(Apache-2.0 OR MIT) AND NCSA",
 libgit2-sys: 0.18.3+1.9.2, "Apache-2.0 OR MIT",
@@ -545,7 +545,7 @@ liblzma: 0.4.6, "Apache-2.0 OR MIT",
 liblzma-sys: 0.4.5, "Apache-2.0 OR MIT",
 libm: 0.2.16, "MIT",
 libmimalloc-sys: 0.1.44, "MIT",
-libredox: 0.1.14, "MIT",
+libredox: 0.1.15, "MIT",
 libsqlite3-sys: 0.30.1, "MIT",
 libz-sys: 1.1.25, "Apache-2.0 OR MIT",
 linked-hash-map: 0.5.6, "Apache-2.0 OR MIT",
@@ -586,7 +586,7 @@ mime: 0.3.17, "Apache-2.0 OR MIT",
 mime_guess: 2.0.5, "MIT",
 minimal-lexical: 0.2.1, "Apache-2.0 OR MIT",
 miniz_oxide: 0.8.9, "Apache-2.0 OR MIT OR Zlib",
-mio: 1.1.1, "MIT",
+mio: 1.2.0, "MIT",
 mockall: 0.14.0, "Apache-2.0 OR MIT",
 mockall_derive: 0.14.0, "Apache-2.0 OR MIT",
 moka: 0.12.15, "(Apache-2.0 OR MIT) AND Apache-2.0",
@@ -616,7 +616,7 @@ num: 0.4.3, "Apache-2.0 OR MIT",
 num-bigint: 0.4.6, "Apache-2.0 OR MIT",
 num-bigint-dig: 0.8.6, "Apache-2.0 OR MIT",
 num-complex: 0.4.6, "Apache-2.0 OR MIT",
-num-conv: 0.2.0, "Apache-2.0 OR MIT",
+num-conv: 0.2.1, "Apache-2.0 OR MIT",
 num-derive: 0.4.2, "Apache-2.0 OR MIT",
 num-integer: 0.1.46, "Apache-2.0 OR MIT",
 num-iter: 0.1.45, "Apache-2.0 OR MIT",
@@ -630,7 +630,7 @@ objc2: 0.6.4, "MIT",
 objc2-core-foundation: 0.3.2, "Apache-2.0 OR MIT OR Zlib",
 objc2-encode: 4.1.0, "MIT",
 objc2-io-kit: 0.3.2, "Apache-2.0 OR MIT OR Zlib",
-octocrab: 0.49.6, "Apache-2.0 OR MIT",
+octocrab: 0.49.7, "Apache-2.0 OR MIT",
 oid-registry: 0.8.1, "Apache-2.0 OR MIT",
 once_cell: 1.21.4, "Apache-2.0 OR MIT",
 once_cell_polyfill: 1.70.2, "Apache-2.0 OR MIT",
@@ -656,7 +656,7 @@ os_pipe: 1.2.3, "MIT",
 outref: 0.5.2, "MIT",
 p256: 0.13.2, "Apache-2.0 OR MIT",
 p384: 0.13.1, "Apache-2.0 OR MIT",
-papaya: 0.2.3, "MIT",
+papaya: 0.2.4, "MIT",
 parking: 2.2.1, "Apache-2.0 OR MIT",
 parking_lot: 0.12.5, "Apache-2.0 OR MIT",
 parking_lot_core: 0.9.12, "Apache-2.0 OR MIT",
@@ -718,7 +718,7 @@ proc-macro2: 1.0.106, "Apache-2.0 OR MIT",
 proc-macro2-diagnostics: 0.10.1, "Apache-2.0 OR MIT",
 profiling: 1.0.17, "Apache-2.0 OR MIT",
 profiling-procmacros: 1.0.17, "Apache-2.0 OR MIT",
-prometheus-client: 0.24.0, "Apache-2.0 OR MIT",
+prometheus-client: 0.24.1, "Apache-2.0 OR MIT",
 prometheus-client-derive-encode: 0.5.0, "Apache-2.0 OR MIT",
 prost: 0.14.3, "Apache-2.0",
 prost-derive: 0.14.3, "Apache-2.0",
@@ -782,8 +782,8 @@ ring: 0.17.14, "Apache-2.0 AND ISC",
 ringbuffer: 0.16.0, "MIT",
 rkyv: 0.7.46, "MIT",
 rkyv_derive: 0.7.46, "MIT",
-rmcp: 1.2.0, "Apache-2.0",
-rmcp-macros: 1.2.0, "Apache-2.0",
+rmcp: 1.3.0, "Apache-2.0",
+rmcp-macros: 1.3.0, "Apache-2.0",
 rmp: 0.8.15, "MIT",
 rmp-serde: 1.3.1, "MIT",
 roaring: 0.11.3, "Apache-2.0 OR MIT",
@@ -795,8 +795,8 @@ rust-embed: 8.11.0, "MIT",
 rust-embed-impl: 8.11.0, "MIT",
 rust-embed-utils: 8.11.0, "MIT",
 rust-ini: 0.21.3, "MIT",
-rust_decimal: 1.40.0, "MIT",
-rustc-hash: 2.1.1, "Apache-2.0 OR MIT",
+rust_decimal: 1.41.0, "MIT",
+rustc-hash: 2.1.2, "Apache-2.0 OR MIT",
 rustc_version: 0.4.1, "Apache-2.0 OR MIT",
 rustc_version_runtime: 0.3.0, "MIT",
 rusticata-macros: 4.1.0, "Apache-2.0 OR MIT",
@@ -842,7 +842,7 @@ serde_json: 1.0.149, "Apache-2.0 OR MIT",
 serde_path_to_error: 0.1.20, "Apache-2.0 OR MIT",
 serde_repr: 0.1.20, "Apache-2.0 OR MIT",
 serde_spanned: 0.6.9, "Apache-2.0 OR MIT",
-serde_spanned: 1.1.0, "Apache-2.0 OR MIT",
+serde_spanned: 1.1.1, "Apache-2.0 OR MIT",
 serde_urlencoded: 0.7.1, "Apache-2.0 OR MIT",
 serde_v8: 0.260.0, "MIT",
 serde_with: 3.18.0, "Apache-2.0 OR MIT",
@@ -859,7 +859,7 @@ sharded-slab: 0.1.7, "MIT",
 shlex: 1.3.0, "Apache-2.0 OR MIT",
 signal-hook-registry: 1.4.8, "Apache-2.0 OR MIT",
 signature: 2.2.0, "Apache-2.0 OR MIT",
-simd-adler32: 0.3.8, "MIT",
+simd-adler32: 0.3.9, "MIT",
 simd-json: 0.17.0, "Apache-2.0 OR MIT",
 simd_helpers: 0.1.0, "MIT",
 simdutf8: 0.1.5, "Apache-2.0 OR MIT",
@@ -907,12 +907,12 @@ svgtypes: 0.15.3, "Apache-2.0 OR MIT",
 syn: 1.0.109, "Apache-2.0 OR MIT",
 syn: 2.0.117, "Apache-2.0 OR MIT",
 sync_wrapper: 1.0.2, "Apache-2.0",
-synchrony: 0.1.6, "MIT",
+synchrony: 0.1.7, "MIT",
 synstructure: 0.13.2, "MIT",
 synthez: 0.4.0, "BlueOak-1.0.0",
 synthez-codegen: 0.4.0, "BlueOak-1.0.0",
 synthez-core: 0.4.0, "BlueOak-1.0.0",
-sys_traits: 0.1.26, "MIT",
+sys_traits: 0.1.27, "MIT",
 sys_traits_macros: 0.1.0, "MIT",
 sysinfo: 0.37.2, "MIT",
 sysinfo: 0.38.4, "MIT",
@@ -954,15 +954,15 @@ tokio-tungstenite: 0.29.0, "MIT",
 tokio-util: 0.7.18, "MIT",
 tokise: 0.2.1, "Apache-2.0 OR MIT",
 toml: 0.8.23, "Apache-2.0 OR MIT",
-toml: 1.1.0+spec-1.1.0, "Apache-2.0 OR MIT",
+toml: 1.1.1+spec-1.1.0, "Apache-2.0 OR MIT",
 toml_datetime: 0.6.11, "Apache-2.0 OR MIT",
-toml_datetime: 1.1.0+spec-1.1.0, "Apache-2.0 OR MIT",
+toml_datetime: 1.1.1+spec-1.1.0, "Apache-2.0 OR MIT",
 toml_edit: 0.19.15, "Apache-2.0 OR MIT",
 toml_edit: 0.22.27, "Apache-2.0 OR MIT",
-toml_edit: 0.25.8+spec-1.1.0, "Apache-2.0 OR MIT",
-toml_parser: 1.1.0+spec-1.1.0, "Apache-2.0 OR MIT",
+toml_edit: 0.25.9+spec-1.1.0, "Apache-2.0 OR MIT",
+toml_parser: 1.1.1+spec-1.1.0, "Apache-2.0 OR MIT",
 toml_write: 0.1.2, "Apache-2.0 OR MIT",
-toml_writer: 1.1.0+spec-1.1.0, "Apache-2.0 OR MIT",
+toml_writer: 1.1.1+spec-1.1.0, "Apache-2.0 OR MIT",
 tonic: 0.14.5, "MIT",
 tonic-prost: 0.14.5, "MIT",
 tools: 0.1.0, "Apache-2.0",
@@ -1007,7 +1007,7 @@ unicode-linebreak: 0.1.5, "Apache-2.0",
 unicode-normalization: 0.1.25, "Apache-2.0 OR MIT",
 unicode-properties: 0.1.4, "Apache-2.0 OR MIT",
 unicode-script: 0.5.8, "Apache-2.0 OR MIT",
-unicode-segmentation: 1.13.0, "Apache-2.0 OR MIT",
+unicode-segmentation: 1.13.2, "Apache-2.0 OR MIT",
 unicode-vo: 0.1.0, "Apache-2.0 OR MIT",
 unicode-width: 0.1.14, "Apache-2.0 OR MIT",
 unicode-width: 0.2.2, "Apache-2.0 OR MIT",
@@ -1025,7 +1025,7 @@ utf8-width: 0.1.8, "MIT",
 utf8-zero: 0.8.1, "Apache-2.0 OR MIT",
 utf8_iter: 1.0.4, "Apache-2.0 OR MIT",
 utf8parse: 0.2.2, "Apache-2.0 OR MIT",
-uuid: 1.22.0, "Apache-2.0 OR MIT",
+uuid: 1.23.0, "Apache-2.0 OR MIT",
 v8: 137.3.0, "MIT",
 v_frame: 0.3.9, "BSD-2-Clause",
 v_htmlescape: 0.15.8, "Apache-2.0 OR MIT",
@@ -1045,11 +1045,11 @@ wasi: 0.11.1+wasi-snapshot-preview1, "Apache-2.0 OR 
Apache-2.0 WITH LLVM-excepti
 wasip2: 1.0.2+wasi-0.2.9, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR 
MIT",
 wasip3: 0.4.0+wasi-0.3.0-rc-2026-01-06, "Apache-2.0 OR Apache-2.0 WITH 
LLVM-exception OR MIT",
 wasite: 0.1.0, "Apache-2.0 OR BSL-1.0 OR MIT",
-wasm-bindgen: 0.2.114, "Apache-2.0 OR MIT",
-wasm-bindgen-futures: 0.4.64, "Apache-2.0 OR MIT",
-wasm-bindgen-macro: 0.2.114, "Apache-2.0 OR MIT",
-wasm-bindgen-macro-support: 0.2.114, "Apache-2.0 OR MIT",
-wasm-bindgen-shared: 0.2.114, "Apache-2.0 OR MIT",
+wasm-bindgen: 0.2.117, "Apache-2.0 OR MIT",
+wasm-bindgen-futures: 0.4.67, "Apache-2.0 OR MIT",
+wasm-bindgen-macro: 0.2.117, "Apache-2.0 OR MIT",
+wasm-bindgen-macro-support: 0.2.117, "Apache-2.0 OR MIT",
+wasm-bindgen-shared: 0.2.117, "Apache-2.0 OR MIT",
 wasm-encoder: 0.244.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
 wasm-metadata: 0.244.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
 wasm-streams: 0.4.2, "Apache-2.0 OR MIT",
@@ -1057,7 +1057,7 @@ wasm-streams: 0.5.0, "Apache-2.0 OR MIT",
 wasm_dep_analyzer: 0.3.0, "MIT",
 wasmparser: 0.244.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
 wasmtimer: 0.4.3, "MIT",
-web-sys: 0.3.91, "Apache-2.0 OR MIT",
+web-sys: 0.3.94, "Apache-2.0 OR MIT",
 web-time: 1.1.0, "Apache-2.0 OR MIT",
 webpki-root-certs: 1.0.6, "CDLA-Permissive-2.0",
 webpki-roots: 0.26.11, "CDLA-Permissive-2.0",
@@ -1133,7 +1133,7 @@ windows_x86_64_msvc: 0.52.6, "Apache-2.0 OR MIT",
 windows_x86_64_msvc: 0.53.1, "Apache-2.0 OR MIT",
 winnow: 0.5.40, "MIT",
 winnow: 0.7.15, "MIT",
-winnow: 1.0.0, "MIT",
+winnow: 1.0.1, "MIT",
 winsafe: 0.0.19, "MIT",
 wit-bindgen: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT",
 wit-bindgen-core: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR 
MIT",
@@ -1155,8 +1155,8 @@ yew-router: 0.20.0, "Apache-2.0 OR MIT",
 yew-router-macro: 0.20.0, "Apache-2.0 OR MIT",
 yoke: 0.8.1, "Unicode-3.0",
 yoke-derive: 0.8.1, "Unicode-3.0",
-zerocopy: 0.8.47, "Apache-2.0 OR BSD-2-Clause OR MIT",
-zerocopy-derive: 0.8.47, "Apache-2.0 OR BSD-2-Clause OR MIT",
+zerocopy: 0.8.48, "Apache-2.0 OR BSD-2-Clause OR MIT",
+zerocopy-derive: 0.8.48, "Apache-2.0 OR BSD-2-Clause OR MIT",
 zerofrom: 0.1.6, "Unicode-3.0",
 zerofrom-derive: 0.1.6, "Unicode-3.0",
 zeroize: 1.8.2, "Apache-2.0 OR MIT",
@@ -1176,4 +1176,4 @@ zune-core: 0.4.12, "Apache-2.0 OR MIT OR Zlib",
 zune-core: 0.5.1, "Apache-2.0 OR MIT OR Zlib",
 zune-inflate: 0.2.54, "Apache-2.0 OR MIT OR Zlib",
 zune-jpeg: 0.4.21, "Apache-2.0 OR MIT OR Zlib",
-zune-jpeg: 0.5.14, "Apache-2.0 OR MIT OR Zlib",
+zune-jpeg: 0.5.15, "Apache-2.0 OR MIT OR Zlib",
diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs
index 62476f3ea..a63aad99a 100644
--- a/core/binary_protocol/src/lib.rs
+++ b/core/binary_protocol/src/lib.rs
@@ -87,6 +87,10 @@ pub use primitives::permissions::{
     WireGlobalPermissions, WirePermissions, WireStreamPermissions, 
WireTopicPermissions,
 };
 pub use primitives::polling_strategy::WirePollingStrategy;
+pub use primitives::user_headers::{
+    WireHeaderKind, WireUserHeaderEntry, WireUserHeaderIterator, 
WireUserHeaders,
+    encode_user_headers, user_headers_encoded_size, validate_user_headers,
+};
 
 /// Maximum number of partitions allowed in a single create/delete request.
 pub const MAX_PARTITIONS_PER_REQUEST: u32 = 1000;
diff --git a/core/binary_protocol/src/primitives/mod.rs 
b/core/binary_protocol/src/primitives/mod.rs
index dcbef623e..d1d7ee56a 100644
--- a/core/binary_protocol/src/primitives/mod.rs
+++ b/core/binary_protocol/src/primitives/mod.rs
@@ -22,3 +22,4 @@ pub mod identifier;
 pub mod partitioning;
 pub mod permissions;
 pub mod polling_strategy;
+pub mod user_headers;
diff --git a/core/binary_protocol/src/primitives/user_headers.rs 
b/core/binary_protocol/src/primitives/user_headers.rs
new file mode 100644
index 000000000..2f49b39d6
--- /dev/null
+++ b/core/binary_protocol/src/primitives/user_headers.rs
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Wire-level primitives for user header TLV validation, iteration, and 
encoding.
+//!
+//! User headers are encoded as a sequence of key-value TLV pairs:
+//! ```text
+//! 
[key_kind:u8][key_len:u32_le][key_data:N][val_kind:u8][val_len:u32_le][val_data:M]
 ...
+//! ```
+//!
+//! This module provides structural validation and zero-copy iteration
+//! over these bytes without interpreting semantic meaning of kind codes.
+//! Domain-level interpretation (mapping kind codes to typed values)
+//! happens in `iggy_common`.
+
+use std::borrow::Cow;
+
+use bytes::{BufMut, Bytes, BytesMut};
+
+use crate::WireError;
+use crate::codec::{WireEncode, read_u8, read_u32_le};
+
+/// Maximum byte length for a single header key or value on the wire.
+const MAX_FIELD_LENGTH: u32 = 255;
+
+/// Opaque header kind tag as transmitted on the wire.
+///
+/// This is a `u8` newtype - NOT an exhaustive enum. New kind codes can be
+/// added without breaking structural validation, which is critical for
+/// VSR rolling upgrades where older nodes must forward messages containing
+/// kind codes they do not yet understand.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub struct WireHeaderKind(pub u8);
+
+impl WireHeaderKind {
+    /// Highest currently defined kind code (Float64 = 15).
+    pub const KNOWN_MAX: u8 = 15;
+
+    /// Returns `true` if the kind code is within the currently defined range.
+    #[must_use]
+    pub fn is_known(self) -> bool {
+        (1..=Self::KNOWN_MAX).contains(&self.0)
+    }
+}
+
+/// A single key-value header entry borrowing the underlying buffer.
+#[derive(Debug)]
+pub struct WireUserHeaderEntry<'a> {
+    pub key_kind: WireHeaderKind,
+    pub key: &'a [u8],
+    pub value_kind: WireHeaderKind,
+    pub value: &'a [u8],
+}
+
+/// Validate the structural integrity of a user headers byte buffer.
+///
+/// Walks TLV pairs and checks:
+/// - Every kind byte is non-zero
+/// - Every length is in `1..=255`
+/// - No buffer overrun
+/// - Total consumed bytes equals `buf.len()` (no trailing garbage)
+/// - Even number of TLV entries (each pair = key + value)
+///
+/// Returns the number of complete key-value pairs on success.
+///
+/// # Errors
+///
+/// Returns `WireError::UnexpectedEof` if the buffer is truncated mid-entry,
+/// or `WireError::Validation` if structural constraints are violated.
+pub fn validate_user_headers(buf: &[u8]) -> Result<u32, WireError> {
+    if buf.is_empty() {
+        return Ok(0);
+    }
+
+    let mut pos = 0;
+    let mut tlv_count: u32 = 0;
+
+    while pos < buf.len() {
+        let kind = read_u8(buf, pos)?;
+        if kind == 0 {
+            return Err(WireError::Validation(Cow::Owned(format!(
+                "header kind is 0 (reserved) at offset {pos}"
+            ))));
+        }
+        pos += 1;
+
+        let length = read_u32_le(buf, pos)?;
+        pos += 4;
+
+        if length == 0 || length > MAX_FIELD_LENGTH {
+            return Err(WireError::Validation(Cow::Owned(format!(
+                "header field length {length} out of range 
1..={MAX_FIELD_LENGTH} at offset {}",
+                pos - 4
+            ))));
+        }
+
+        let data_end = pos
+            .checked_add(length as usize)
+            .ok_or(WireError::Validation(Cow::Borrowed(
+                "header field length overflow",
+            )))?;
+        if data_end > buf.len() {
+            return Err(WireError::UnexpectedEof {
+                offset: pos,
+                need: length as usize,
+                have: buf.len() - pos,
+            });
+        }
+        pos = data_end;
+
+        tlv_count = tlv_count
+            .checked_add(1)
+            .ok_or(WireError::Validation(Cow::Borrowed(
+                "header entry count overflow",
+            )))?;
+    }
+
+    if !tlv_count.is_multiple_of(2) {
+        return Err(WireError::Validation(Cow::Owned(format!(
+            "odd number of TLV entries ({tlv_count}), expected key-value pairs"
+        ))));
+    }
+
+    Ok(tlv_count / 2)
+}
+
+/// Zero-copy iterator over pre-validated user header bytes.
+///
+/// Yields one [`WireUserHeaderEntry`] per key-value pair. The buffer
+/// **must** have been validated by [`validate_user_headers`] first;
+/// iterating an invalid buffer may yield garbage or panic.
+pub struct WireUserHeaderIterator<'a> {
+    buf: &'a [u8],
+    pos: usize,
+}
+
+impl<'a> WireUserHeaderIterator<'a> {
+    /// Create an iterator over a pre-validated user headers buffer.
+    #[must_use]
+    pub const fn new(buf: &'a [u8]) -> Self {
+        Self { buf, pos: 0 }
+    }
+}
+
+impl<'a> Iterator for WireUserHeaderIterator<'a> {
+    type Item = WireUserHeaderEntry<'a>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.pos >= self.buf.len() {
+            return None;
+        }
+
+        // Key TLV
+        let key_kind = self.buf[self.pos];
+        self.pos += 1;
+
+        let key_len = u32::from_le_bytes(
+            self.buf[self.pos..self.pos + 4]
+                .try_into()
+                .expect("pre-validated buffer"),
+        ) as usize;
+        self.pos += 4;
+
+        let key = &self.buf[self.pos..self.pos + key_len];
+        self.pos += key_len;
+
+        // Value TLV
+        let value_kind = self.buf[self.pos];
+        self.pos += 1;
+
+        let value_len = u32::from_le_bytes(
+            self.buf[self.pos..self.pos + 4]
+                .try_into()
+                .expect("pre-validated buffer"),
+        ) as usize;
+        self.pos += 4;
+
+        let value = &self.buf[self.pos..self.pos + value_len];
+        self.pos += value_len;
+
+        Some(WireUserHeaderEntry {
+            key_kind: WireHeaderKind(key_kind),
+            key,
+            value_kind: WireHeaderKind(value_kind),
+            value,
+        })
+    }
+}
+
+/// Calculate the exact encoded size for a set of header entries.
+///
+/// Each entry tuple is `(key_kind, key_data, value_kind, value_data)`.
+#[must_use]
+pub fn user_headers_encoded_size(entries: &[(u8, &[u8], u8, &[u8])]) -> usize {
+    entries
+        .iter()
+        .map(|(_, k, _, v)| 1 + 4 + k.len() + 1 + 4 + v.len())
+        .sum()
+}
+
+/// Encode header entries into a caller-owned buffer.
+///
+/// Each entry tuple is `(key_kind, key_data, value_kind, value_data)`.
+/// The caller should pre-allocate `buf` with [`user_headers_encoded_size`].
+pub fn encode_user_headers(entries: &[(u8, &[u8], u8, &[u8])], buf: &mut 
BytesMut) {
+    for &(key_kind, key_data, value_kind, value_data) in entries {
+        buf.put_u8(key_kind);
+        #[allow(clippy::cast_possible_truncation)]
+        buf.put_u32_le(key_data.len() as u32);
+        buf.put_slice(key_data);
+        buf.put_u8(value_kind);
+        #[allow(clippy::cast_possible_truncation)]
+        buf.put_u32_le(value_data.len() as u32);
+        buf.put_slice(value_data);
+    }
+}
+
+/// Pre-validated user headers as a contiguous TLV byte buffer.
+///
+/// Construction validates structural integrity (no partial TLVs, no zero 
kinds,
+/// no oversized fields, even entry count). The inner bytes are immutable.
+///
+/// This type is intentionally opaque at the wire layer. Domain-level
+/// interpretation of kind codes happens in `iggy_common` via the conversion
+/// bridge in `wire_conversions.rs`.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct WireUserHeaders(Bytes);
+
+impl WireUserHeaders {
+    /// Validate and wrap a borrowed byte slice (copies into new `Bytes`).
+    ///
+    /// # Errors
+    /// Returns `WireError` if the buffer is not structurally valid TLV.
+    pub fn from_slice(buf: &[u8]) -> Result<Self, WireError> {
+        validate_user_headers(buf)?;
+        Ok(Self(Bytes::copy_from_slice(buf)))
+    }
+
+    /// Validate and wrap an owned `Bytes` buffer (zero-copy).
+    ///
+    /// # Errors
+    /// Returns `WireError` if the buffer is not structurally valid TLV.
+    pub fn from_bytes(buf: Bytes) -> Result<Self, WireError> {
+        validate_user_headers(&buf)?;
+        Ok(Self(buf))
+    }
+
+    /// Wrap pre-validated bytes without re-checking.
+    ///
+    /// # Safety contract (not `unsafe`, but caller must ensure):
+    /// The bytes must be structurally valid TLV as defined by
+    /// [`validate_user_headers`]. Iterating invalid bytes will panic.
+    pub const fn from_validated(buf: Bytes) -> Self {
+        Self(buf)
+    }
+
+    /// Empty headers (zero-length buffer).
+    #[must_use]
+    pub const fn empty() -> Self {
+        Self(Bytes::new())
+    }
+
+    #[must_use]
+    pub const fn is_empty(&self) -> bool {
+        self.0.is_empty()
+    }
+
+    /// The raw validated bytes, suitable for wire transmission.
+    #[must_use]
+    pub fn as_bytes(&self) -> &[u8] {
+        &self.0
+    }
+
+    /// Consume into the inner `Bytes` handle (zero-copy).
+    #[must_use]
+    pub fn into_bytes(self) -> Bytes {
+        self.0
+    }
+
+    /// Zero-copy iteration over the pre-validated entries.
+    #[must_use]
+    pub fn iter(&self) -> WireUserHeaderIterator<'_> {
+        WireUserHeaderIterator::new(&self.0)
+    }
+}
+
+impl<'a> IntoIterator for &'a WireUserHeaders {
+    type Item = WireUserHeaderEntry<'a>;
+    type IntoIter = WireUserHeaderIterator<'a>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        self.iter()
+    }
+}
+
+impl WireEncode for WireUserHeaders {
+    fn encoded_size(&self) -> usize {
+        self.0.len()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_slice(&self.0);
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn encode_test_entries(entries: &[(u8, &[u8], u8, &[u8])]) -> Vec<u8> {
+        let size = user_headers_encoded_size(entries);
+        let mut buf = BytesMut::with_capacity(size);
+        encode_user_headers(entries, &mut buf);
+        buf.to_vec()
+    }
+
+    #[test]
+    fn empty_buffer_is_valid() {
+        assert_eq!(validate_user_headers(&[]).unwrap(), 0);
+    }
+
+    #[test]
+    fn single_pair_round_trip() {
+        let entries = [(6u8, b"test" as &[u8], 2u8, b"val" as &[u8])];
+        let encoded = encode_test_entries(&entries);
+
+        assert_eq!(validate_user_headers(&encoded).unwrap(), 1);
+
+        let mut iter = WireUserHeaderIterator::new(&encoded);
+        let entry = iter.next().unwrap();
+        assert_eq!(entry.key_kind, WireHeaderKind(6));
+        assert_eq!(entry.key, b"test");
+        assert_eq!(entry.value_kind, WireHeaderKind(2));
+        assert_eq!(entry.value, b"val");
+        assert!(iter.next().is_none());
+    }
+
+    #[test]
+    fn multiple_pairs() {
+        let entries = [
+            (1u8, b"k1" as &[u8], 2u8, b"v1" as &[u8]),
+            (
+                6u8,
+                &42i32.to_le_bytes() as &[u8],
+                12u8,
+                &99u64.to_le_bytes() as &[u8],
+            ),
+            (3u8, &[1u8] as &[u8], 9u8, &[255u8] as &[u8]),
+        ];
+        let encoded = encode_test_entries(&entries);
+
+        assert_eq!(validate_user_headers(&encoded).unwrap(), 3);
+
+        let items: Vec<_> = WireUserHeaderIterator::new(&encoded).collect();
+        assert_eq!(items.len(), 3);
+        assert_eq!(items[0].key, b"k1");
+        assert_eq!(items[1].key, &42i32.to_le_bytes());
+        assert_eq!(items[2].value, &[255u8]);
+    }
+
+    #[test]
+    fn unknown_kind_codes_pass_validation() {
+        let entries = [(16u8, b"a" as &[u8], 255u8, b"b" as &[u8])];
+        let encoded = encode_test_entries(&entries);
+        assert_eq!(validate_user_headers(&encoded).unwrap(), 1);
+
+        let entry = WireUserHeaderIterator::new(&encoded).next().unwrap();
+        assert_eq!(entry.key_kind, WireHeaderKind(16));
+        assert!(!entry.key_kind.is_known());
+        assert_eq!(entry.value_kind, WireHeaderKind(255));
+        assert!(!entry.value_kind.is_known());
+    }
+
+    #[test]
+    fn known_kind_codes() {
+        for code in 1..=15u8 {
+            assert!(WireHeaderKind(code).is_known());
+        }
+        assert!(!WireHeaderKind(0).is_known());
+        assert!(!WireHeaderKind(16).is_known());
+    }
+
+    #[test]
+    fn truncated_at_kind_byte() {
+        let entries = [(1u8, b"k" as &[u8], 2u8, b"v" as &[u8])];
+        let encoded = encode_test_entries(&entries);
+        // Truncate to just the first kind byte and length, missing data
+        let result = validate_user_headers(&encoded[..3]);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn truncated_at_length_field() {
+        let entries = [(1u8, b"k" as &[u8], 2u8, b"v" as &[u8])];
+        let encoded = encode_test_entries(&entries);
+        // Only kind byte, partial length
+        let result = validate_user_headers(&encoded[..2]);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn truncated_at_data() {
+        let entries = [(1u8, b"longkey" as &[u8], 2u8, b"v" as &[u8])];
+        let encoded = encode_test_entries(&entries);
+        // Kind + length present, but data truncated
+        let result = validate_user_headers(&encoded[..7]);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn zero_length_rejected() {
+        // kind=1, length=0
+        let buf = [1u8, 0, 0, 0, 0];
+        let result = validate_user_headers(&buf);
+        assert!(matches!(result, Err(WireError::Validation(_))));
+    }
+
+    #[test]
+    fn length_exceeds_max_rejected() {
+        // kind=1, length=256
+        let mut buf = vec![1u8];
+        buf.extend_from_slice(&256u32.to_le_bytes());
+        buf.extend_from_slice(&[0u8; 256]);
+        let result = validate_user_headers(&buf);
+        assert!(matches!(result, Err(WireError::Validation(_))));
+    }
+
+    #[test]
+    fn kind_zero_rejected() {
+        // kind=0 is reserved
+        let mut buf = vec![0u8];
+        buf.extend_from_slice(&1u32.to_le_bytes());
+        buf.push(42);
+        let result = validate_user_headers(&buf);
+        assert!(matches!(result, Err(WireError::Validation(_))));
+    }
+
+    #[test]
+    fn trailing_bytes_rejected() {
+        let entries = [(1u8, b"k" as &[u8], 2u8, b"v" as &[u8])];
+        let mut encoded = encode_test_entries(&entries);
+        encoded.push(0xFF); // trailing garbage
+        let result = validate_user_headers(&encoded);
+        assert!(result.is_err());
+    }
+
+    #[test]
+    fn odd_tlv_count_rejected() {
+        // One TLV entry (key without value) - odd count
+        let mut buf = vec![1u8];
+        buf.extend_from_slice(&2u32.to_le_bytes());
+        buf.extend_from_slice(b"ab");
+        let result = validate_user_headers(&buf);
+        assert!(matches!(result, Err(WireError::Validation(_))));
+    }
+
+    #[test]
+    fn encoded_size_matches_actual() {
+        let entries = [
+            (1u8, b"key1" as &[u8], 2u8, b"value1" as &[u8]),
+            (
+                6u8,
+                &100i32.to_le_bytes() as &[u8],
+                14u8,
+                &std::f32::consts::PI.to_le_bytes() as &[u8],
+            ),
+        ];
+        let expected_size = user_headers_encoded_size(&entries);
+        let encoded = encode_test_entries(&entries);
+        assert_eq!(expected_size, encoded.len());
+    }
+
+    #[test]
+    fn max_length_field_accepted() {
+        let data = [0xAA; 255];
+        let entries = [(1u8, &data[..], 1u8, &data[..])];
+        let encoded = encode_test_entries(&entries);
+        assert_eq!(validate_user_headers(&encoded).unwrap(), 1);
+    }
+
+    #[test]
+    fn wire_user_headers_empty() {
+        let wire = WireUserHeaders::empty();
+        assert!(wire.is_empty());
+        assert_eq!(wire.as_bytes(), &[]);
+        assert_eq!(wire.encoded_size(), 0);
+    }
+
+    #[test]
+    fn wire_user_headers_from_slice_round_trip() {
+        let entries = [(6u8, b"test" as &[u8], 2u8, b"val" as &[u8])];
+        let encoded = encode_test_entries(&entries);
+
+        let wire = WireUserHeaders::from_slice(&encoded).unwrap();
+        assert!(!wire.is_empty());
+        assert_eq!(wire.as_bytes(), &encoded);
+        assert_eq!(wire.encoded_size(), encoded.len());
+
+        let entry = wire.iter().next().unwrap();
+        assert_eq!(entry.key_kind, WireHeaderKind(6));
+        assert_eq!(entry.key, b"test");
+    }
+
+    #[test]
+    fn wire_user_headers_from_bytes_zero_copy() {
+        let entries = [(1u8, b"k" as &[u8], 2u8, b"v" as &[u8])];
+        let encoded = Bytes::from(encode_test_entries(&entries));
+
+        let wire = WireUserHeaders::from_bytes(encoded.clone()).unwrap();
+        assert_eq!(wire.as_bytes(), &encoded[..]);
+        assert_eq!(wire.into_bytes(), encoded);
+    }
+
+    #[test]
+    fn wire_user_headers_rejects_invalid() {
+        let buf = [0u8, 1, 0, 0, 0, 42]; // kind=0 is invalid
+        assert!(WireUserHeaders::from_slice(&buf).is_err());
+    }
+
+    #[test]
+    fn wire_user_headers_encode() {
+        let entries = [(1u8, b"k" as &[u8], 2u8, b"v" as &[u8])];
+        let encoded = encode_test_entries(&entries);
+        let wire = WireUserHeaders::from_slice(&encoded).unwrap();
+
+        let mut buf = BytesMut::with_capacity(wire.encoded_size());
+        wire.encode(&mut buf);
+        assert_eq!(&buf[..], &encoded);
+    }
+}
diff --git a/core/cli/Cargo.toml b/core/cli/Cargo.toml
index 3d2044066..5fabc500d 100644
--- a/core/cli/Cargo.toml
+++ b/core/cli/Cargo.toml
@@ -54,6 +54,7 @@ comfy-table = { workspace = true }
 dirs = { workspace = true }
 figlet-rs = { workspace = true }
 iggy = { workspace = true }
+iggy_binary_protocol = { workspace = true }
 iggy_common = { workspace = true }
 keyring = { workspace = true, optional = true }
 passterm = { workspace = true }
diff --git a/core/cli/src/commands/binary_message/poll_messages.rs 
b/core/cli/src/commands/binary_message/poll_messages.rs
index 1b0a18422..3274bb943 100644
--- a/core/cli/src/commands/binary_message/poll_messages.rs
+++ b/core/cli/src/commands/binary_message/poll_messages.rs
@@ -20,10 +20,12 @@ use crate::commands::cli_command::{CliCommand, 
PRINT_TARGET};
 use anyhow::Context;
 use async_trait::async_trait;
 use comfy_table::{Cell, CellAlignment, Row, Table};
+use iggy_binary_protocol::WireUserHeaders;
 use iggy_common::Client;
 use iggy_common::{
     Consumer, HeaderKey, HeaderKind, Identifier, IggyByteSize, IggyDuration, 
IggyMessage,
-    IggyTimestamp, PollMessages, PollingStrategy, Sizeable, 
user_headers_from_bytes,
+    IggyTimestamp, PollMessages, PollingStrategy, Sizeable,
+    wire_conversions::user_headers_from_wire,
 };
 use std::collections::HashSet;
 use tokio::io::AsyncWriteExt;
@@ -85,7 +87,10 @@ impl PollMessagesCmd {
             .iter()
             .flat_map(|m| {
                 if let Some(user_headers) = &m.user_headers {
-                    match user_headers_from_bytes(user_headers.clone()) {
+                    match WireUserHeaders::from_bytes(user_headers.clone())
+                        .map_err(|_| iggy_common::IggyError::InvalidHeaderKey)
+                        .and_then(|w| user_headers_from_wire(&w))
+                    {
                         Ok(headers) => headers
                             .iter()
                             .map(|(k, v)| (k.clone(), v.kind()))
diff --git a/core/cli/src/commands/binary_message/send_messages.rs 
b/core/cli/src/commands/binary_message/send_messages.rs
index ebbd13c5d..99d40ebda 100644
--- a/core/cli/src/commands/binary_message/send_messages.rs
+++ b/core/cli/src/commands/binary_message/send_messages.rs
@@ -22,7 +22,7 @@ use async_trait::async_trait;
 use bytes::Bytes;
 use iggy_common::Client;
 use iggy_common::{HeaderKey, HeaderValue, Identifier, IggyMessage, 
Partitioning, Sizeable};
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use std::io::{self, Read};
 use tokio::io::AsyncReadExt;
 use tracing::{Level, event};
@@ -73,7 +73,7 @@ impl SendMessagesCmd {
         Ok(buffer)
     }
 
-    fn get_headers(&self) -> Option<HashMap<HeaderKey, HeaderValue>> {
+    fn get_headers(&self) -> Option<BTreeMap<HeaderKey, HeaderValue>> {
         match self.headers.len() {
             0 => None,
             _ => Some(self.headers.iter().cloned().collect()),
diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index dca0f75c2..fca1ed1e8 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -388,6 +388,8 @@ pub enum IggyError {
     InvalidMessageTimestampDelta(u64) = 4038,
     #[error("Invalid batch checksum: {0}, expected: {1}, for base offset: 
{2}")]
     InvalidBatchChecksum(u64, u64, u64) = 4039,
+    #[error("Invalid header kind code: {0}")]
+    InvalidHeaderKind(u8) = 4040,
     #[error("Cannot sed messages due to client disconnection")]
     CannotSendMessagesDueToClientDisconnection = 4050,
     #[error("Background send error")]
diff --git a/core/common/src/http/messages/send_messages.rs 
b/core/common/src/http/messages/send_messages.rs
index d01049dae..445005f64 100644
--- a/core/common/src/http/messages/send_messages.rs
+++ b/core/common/src/http/messages/send_messages.rs
@@ -29,7 +29,7 @@ use bytes::Bytes;
 use serde::de::{self, MapAccess, Visitor};
 use serde::ser::SerializeStruct;
 use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use std::fmt::Formatter;
 
 /// `SendMessages` command is used to send messages to a topic in a stream.
@@ -233,7 +233,7 @@ impl<'de> Deserialize<'de> for SendMessages {
                                                     "Invalid headers format: 
{e}"
                                                 ))
                                             })?;
-                                            let mut map = HashMap::new();
+                                            let mut map = BTreeMap::new();
                                             for entry in entries {
                                                 map.insert(entry.key, 
entry.value);
                                             }
diff --git a/core/common/src/types/message/iggy_message.rs 
b/core/common/src/types/message/iggy_message.rs
index 914fe27dd..1eabac333 100644
--- a/core/common/src/types/message/iggy_message.rs
+++ b/core/common/src/types/message/iggy_message.rs
@@ -17,19 +17,19 @@
  */
 
 use super::message_header::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader};
-use super::user_headers::{get_user_headers_size, user_headers_from_bytes, 
user_headers_to_bytes};
 use crate::Sizeable;
 use crate::error::IggyError;
 use crate::utils::byte_size::IggyByteSize;
 use crate::utils::timestamp::IggyTimestamp;
+use crate::wire_conversions::{user_headers_from_wire, user_headers_to_wire};
 use crate::{HeaderKey, HeaderValue};
 use bon::bon;
 use bytes::{BufMut, Bytes, BytesMut};
+use iggy_binary_protocol::WireUserHeaders;
 use serde::{Deserialize, Deserializer, Serialize, Serializer};
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use std::convert::TryFrom;
 use std::str::FromStr;
-use tracing::warn;
 
 /// Maximum allowed size in bytes for a message payload.
 ///
@@ -71,7 +71,7 @@ pub const MAX_USER_HEADERS_SIZE: u32 = 100 * 1000;
 /// ```
 /// use iggy_common::*;
 /// use std::str::FromStr;
-/// use std::collections::HashMap;
+/// use std::collections::BTreeMap;
 /// use bytes::Bytes;
 ///
 /// // Create a simple text message
@@ -96,7 +96,7 @@ pub const MAX_USER_HEADERS_SIZE: u32 = 100 * 1000;
 /// // Create a message with headers
 /// let key = HeaderKey::from_str("content-type").unwrap();
 /// let value = HeaderValue::from_str("text/plain").unwrap();
-/// let user_headers = HashMap::from([(key, value)]);
+/// let user_headers = BTreeMap::from([(key, value)]);
 ///
 /// let message = IggyMessage::builder()
 ///     .payload("Message with metadata".into())
@@ -135,7 +135,7 @@ impl IggyMessage {
     /// ```
     /// use iggy_common::*;
     /// use bytes::Bytes;
-    /// use std::collections::HashMap;
+    /// use std::collections::BTreeMap;
     /// use std::str::FromStr;
     ///
     /// // Simple message with just payload
@@ -154,7 +154,7 @@ impl IggyMessage {
     /// // Message with headers
     /// let key = HeaderKey::from_str("content-type").unwrap();
     /// let value = HeaderValue::from_str("text/plain").unwrap();
-    /// let user_headers = HashMap::from([(key, value)]);
+    /// let user_headers = BTreeMap::from([(key, value)]);
     /// let msg = IggyMessage::builder()
     ///     .payload("Hello".into())
     ///     .user_headers(user_headers)
@@ -165,7 +165,7 @@ impl IggyMessage {
     pub fn new(
         id: Option<u128>,
         payload: Bytes,
-        user_headers: Option<HashMap<HeaderKey, HeaderValue>>,
+        user_headers: Option<BTreeMap<HeaderKey, HeaderValue>>,
     ) -> Result<Self, IggyError> {
         if payload.is_empty() {
             return Err(IggyError::InvalidMessagePayloadLength);
@@ -175,7 +175,10 @@ impl IggyMessage {
             return Err(IggyError::TooBigMessagePayload);
         }
 
-        let user_headers_length = 
get_user_headers_size(&user_headers).unwrap_or(0);
+        let wire_headers = user_headers.as_ref().map(user_headers_to_wire);
+        let user_headers_length = wire_headers
+            .as_ref()
+            .map_or(0, |w| w.as_bytes().len() as u32);
 
         if user_headers_length > MAX_USER_HEADERS_SIZE {
             return Err(IggyError::TooBigUserHeaders);
@@ -192,7 +195,7 @@ impl IggyMessage {
             reserved: 0,
         };
 
-        let user_headers = user_headers.map(|h| user_headers_to_bytes(&h));
+        let user_headers = wire_headers.map(|w| w.into_bytes());
 
         Ok(Self {
             header,
@@ -203,13 +206,13 @@ impl IggyMessage {
 }
 
 impl IggyMessage {
-    /// Gets the user headers as a typed HashMap.
+    /// Gets the user headers as a typed BTreeMap.
     ///
-    /// This method parses the binary header data into a typed HashMap for 
easy access.
+    /// This method parses the binary header data into a typed BTreeMap for 
easy access.
     ///
     /// # Returns
     ///
-    /// * `Ok(Some(HashMap))` - Successfully parsed headers
+    /// * `Ok(Some(BTreeMap))` - Successfully parsed headers
     /// * `Ok(None)` - No headers present
     /// * `Err(IggyError)` - Error parsing headers
     ///
@@ -218,11 +221,11 @@ impl IggyMessage {
     /// ```
     /// use iggy_common::*;
     /// use std::str::FromStr;
-    /// use std::collections::HashMap;
+    /// use std::collections::BTreeMap;
     ///
     /// let key = HeaderKey::from_str("content-type").unwrap();
     /// let value = HeaderValue::from_str("text/plain").unwrap();
-    /// let user_headers_map = HashMap::from([(key.clone(), value)]);
+    /// let user_headers_map = BTreeMap::from([(key.clone(), value)]);
     ///
     /// let message = IggyMessage::builder()
     ///     .payload("Hello".into())
@@ -233,21 +236,22 @@ impl IggyMessage {
     /// let headers = message.user_headers_map().unwrap().unwrap();
     /// assert!(headers.contains_key(&key));
     /// ```
-    pub fn user_headers_map(&self) -> Result<Option<HashMap<HeaderKey, 
HeaderValue>>, IggyError> {
+    pub fn user_headers_map(&self) -> Result<Option<BTreeMap<HeaderKey, 
HeaderValue>>, IggyError> {
         if let Some(user_headers) = &self.user_headers {
-            match user_headers_from_bytes(user_headers.clone()) {
-                Ok(h) => Ok(Some(h)),
+            let wire = match WireUserHeaders::from_bytes(user_headers.clone()) 
{
+                Ok(w) => w,
                 Err(e) => {
-                    warn!(
-                        "Failed to deserialize user headers: {e} for message 
at offset {}, sent at: {} ({}), user_headers_length: {}, skipping field...",
+                    tracing::warn!(
+                        "Failed to parse user headers for message at offset 
{}, \
+                         user_headers_length: {}: {e}",
                         self.header.offset,
-                        
IggyTimestamp::from(self.header.origin_timestamp).to_rfc3339_string(),
-                        self.header.origin_timestamp,
                         self.header.user_headers_length
                     );
-                    Ok(None)
+                    return Ok(None);
                 }
-            }
+            };
+            let map = user_headers_from_wire(&wire)?;
+            Ok(Some(map))
         } else {
             Ok(None)
         }
@@ -264,7 +268,7 @@ impl IggyMessage {
     /// # Returns
     ///
     /// * `Ok(Some(HeaderValue))` - User header found with its value
-    /// * `Ok(None)` - User header not found or user headers couldn't be parsed
+    /// * `Ok(None)` - User header not found or no user headers present
     /// * `Err(IggyError)` - Error accessing user headers
     ///
     /// # Examples
@@ -272,11 +276,11 @@ impl IggyMessage {
     /// ```
     /// use iggy_common::*;
     /// use std::str::FromStr;
-    /// use std::collections::HashMap;
+    /// use std::collections::BTreeMap;
     ///
     /// let key = HeaderKey::from_str("content-type").unwrap();
     /// let value = HeaderValue::from_str("text/plain").unwrap();
-    /// let user_headers_map = HashMap::from([(key.clone(), value.clone())]);
+    /// let user_headers_map = BTreeMap::from([(key.clone(), value.clone())]);
     ///
     /// let message = IggyMessage::builder()
     ///     .payload("Hello".into())
@@ -310,11 +314,11 @@ impl IggyMessage {
     /// ```
     /// use iggy_common::*;
     /// use std::str::FromStr;
-    /// use std::collections::HashMap;
+    /// use std::collections::BTreeMap;
     ///
     /// let key = HeaderKey::from_str("content-type").unwrap();
     /// let value = HeaderValue::from_str("text/plain").unwrap();
-    /// let user_headers_map = HashMap::from([(key.clone(), value)]);
+    /// let user_headers_map = BTreeMap::from([(key.clone(), value)]);
     ///
     /// let message = IggyMessage::builder()
     ///     .payload("Hello".into())
@@ -597,7 +601,7 @@ impl<'de> Deserialize<'de> for IggyMessage {
             {
                 let mut header: Option<IggyMessageHeader> = None;
                 let mut payload: Option<Bytes> = None;
-                let mut user_headers: Option<HashMap<HeaderKey, HeaderValue>> 
= None;
+                let mut user_headers: Option<BTreeMap<HeaderKey, HeaderValue>> 
= None;
                 let mut raw_user_headers: Option<Bytes> = None;
 
                 while let Some(key) = map.next_key::<String>()? {
@@ -630,7 +634,7 @@ impl<'de> Deserialize<'de> for IggyMessage {
                                     .map_err(|e| {
                                         de::Error::custom(format!("Invalid 
headers format: {e}"))
                                     })?;
-                                let mut headers_map = HashMap::new();
+                                let mut headers_map = BTreeMap::new();
                                 for entry in entries {
                                     headers_map.insert(entry.key, entry.value);
                                 }
@@ -649,7 +653,7 @@ impl<'de> Deserialize<'de> for IggyMessage {
                 let user_headers_bytes = if let Some(raw) = raw_user_headers {
                     Some(raw)
                 } else {
-                    user_headers.map(|headers| user_headers_to_bytes(&headers))
+                    user_headers.map(|headers| 
user_headers_to_wire(&headers).into_bytes())
                 };
 
                 let user_headers_length = user_headers_bytes
@@ -704,7 +708,7 @@ mod tests {
 
     #[test]
     fn test_create_with_headers() {
-        let mut headers = HashMap::new();
+        let mut headers = BTreeMap::new();
         headers.insert(
             HeaderKey::try_from("content-type").unwrap(),
             HeaderValue::try_from("text/plain").unwrap(),
@@ -782,7 +786,7 @@ mod tests {
 
     #[test]
     fn test_json_serialization_with_headers() {
-        let mut headers = HashMap::new();
+        let mut headers = BTreeMap::new();
         headers.insert(
             HeaderKey::try_from("content-type").unwrap(),
             HeaderValue::try_from("text/plain").unwrap(),
diff --git a/core/common/src/types/message/message_view.rs 
b/core/common/src/types/message/message_view.rs
index c3db53154..0706b047d 100644
--- a/core/common/src/types/message/message_view.rs
+++ b/core/common/src/types/message/message_view.rs
@@ -19,14 +19,14 @@
 use super::HeaderValue;
 use super::message_boundaries::IggyMessageBoundaries;
 use super::message_header::*;
-use super::user_headers::user_headers_from_bytes;
 use crate::IggyByteSize;
 use crate::Sizeable;
 use crate::error::IggyError;
 use crate::utils::checksum;
+use crate::wire_conversions::user_headers_from_wire;
 use crate::{HeaderKey, IggyMessageHeaderView};
-use bytes::Bytes;
-use std::collections::HashMap;
+use iggy_binary_protocol::WireUserHeaders;
+use std::collections::BTreeMap;
 use std::num::NonZeroUsize;
 
 /// A immutable view of a message.
@@ -86,22 +86,20 @@ impl<'a> IggyMessageView<'a> {
     }
 
     /// Return instantiated user headers map
-    pub fn user_headers_map(&self) -> Result<Option<HashMap<HeaderKey, 
HeaderValue>>, IggyError> {
+    pub fn user_headers_map(&self) -> Result<Option<BTreeMap<HeaderKey, 
HeaderValue>>, IggyError> {
         if let Some(headers) = self.user_headers() {
-            let headers_bytes = Bytes::copy_from_slice(headers);
-
-            match user_headers_from_bytes(headers_bytes) {
-                Ok(h) => Ok(Some(h)),
+            let wire = match WireUserHeaders::from_slice(headers) {
+                Ok(w) => w,
                 Err(e) => {
-                    tracing::error!(
-                        "Error parsing headers: {}, header_length={}",
-                        e,
+                    tracing::warn!(
+                        "Failed to parse user headers: {e}, header_length={}",
                         self.header().user_headers_length()
                     );
-
-                    Ok(None)
+                    return Ok(None);
                 }
-            }
+            };
+            let map = user_headers_from_wire(&wire)?;
+            Ok(Some(map))
         } else {
             Ok(None)
         }
diff --git a/core/common/src/types/message/mod.rs 
b/core/common/src/types/message/mod.rs
index bd5688926..281cc3273 100644
--- a/core/common/src/types/message/mod.rs
+++ b/core/common/src/types/message/mod.rs
@@ -71,6 +71,5 @@ pub use polling_kind::PollingKind;
 pub use polling_strategy::PollingStrategy;
 pub use user_headers::{
     HeaderEntry, HeaderField, HeaderKey, HeaderKind, HeaderValue, KeyMarker, 
UserHeaders,
-    ValueMarker, deserialize_headers, serialize_headers, 
user_headers_from_bytes,
-    user_headers_to_bytes,
+    ValueMarker, deserialize_headers, serialize_headers,
 };
diff --git a/core/common/src/types/message/user_headers.rs 
b/core/common/src/types/message/user_headers.rs
index 396608199..72028464f 100644
--- a/core/common/src/types/message/user_headers.rs
+++ b/core/common/src/types/message/user_headers.rs
@@ -17,11 +17,11 @@
  */
 
 use crate::error::IggyError;
-use bytes::{BufMut, Bytes, BytesMut};
+use bytes::Bytes;
 use serde::{Deserialize, Serialize};
 use serde_with::base64::Base64;
 use serde_with::serde_as;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use std::fmt::{Display, Formatter};
 use std::hash::{Hash, Hasher};
 use std::marker::PhantomData;
@@ -84,7 +84,7 @@ pub type HeaderKey = HeaderField<KeyMarker>;
 pub type HeaderValue = HeaderField<ValueMarker>;
 
 /// Type alias for a collection of user-defined message headers.
-pub type UserHeaders = HashMap<HeaderKey, HeaderValue>;
+pub type UserHeaders = BTreeMap<HeaderKey, HeaderValue>;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 pub struct KeyMarker;
@@ -266,11 +266,11 @@ impl HeaderKind {
             13 => Ok(HeaderKind::Uint128),
             14 => Ok(HeaderKind::Float32),
             15 => Ok(HeaderKind::Float64),
-            _ => Err(IggyError::InvalidCommand),
+            _ => Err(IggyError::InvalidHeaderKind(code)),
         }
     }
 
-    fn expected_size(&self) -> Option<usize> {
+    pub(crate) fn expected_size(&self) -> Option<usize> {
         match self {
             HeaderKind::Raw | HeaderKind::String => None,
             HeaderKind::Bool | HeaderKind::Int8 | HeaderKind::Uint8 => Some(1),
@@ -282,6 +282,18 @@ impl HeaderKind {
     }
 }
 
+impl Ord for HeaderKind {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.as_code().cmp(&other.as_code())
+    }
+}
+
+impl PartialOrd for HeaderKind {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
 impl FromStr for HeaderKind {
     type Err = IggyError;
     fn from_str(s: &str) -> Result<Self, Self::Err> {
@@ -598,15 +610,27 @@ impl<T> HeaderField<T> {
         }
     }
 
-    fn new_unchecked(kind: HeaderKind, value: &[u8]) -> Self {
+    pub(crate) fn new_unchecked(kind: HeaderKind, value: &[u8]) -> Self {
         Self {
             kind,
-            value: Bytes::from(value.to_vec()),
+            value: Bytes::copy_from_slice(value),
             _marker: PhantomData,
         }
     }
 }
 
+impl<T: Eq> Ord for HeaderField<T> {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        (self.kind.as_code(), &self.value).cmp(&(other.kind.as_code(), 
&other.value))
+    }
+}
+
+impl<T: Eq> PartialOrd for HeaderField<T> {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
 impl<T> TryFrom<&str> for HeaderField<T> {
     type Error = IggyError;
     fn try_from(value: &str) -> Result<Self, Self::Error> {
@@ -929,110 +953,6 @@ impl<T> TryFrom<&HeaderField<T>> for Vec<u8> {
     }
 }
 
-pub fn user_headers_to_bytes(headers: &HashMap<HeaderKey, HeaderValue>) -> 
Bytes {
-    if headers.is_empty() {
-        return Bytes::new();
-    }
-
-    let mut bytes = BytesMut::new();
-    for (key, value) in headers {
-        bytes.put_u8(key.kind().as_code());
-        #[allow(clippy::cast_possible_truncation)]
-        bytes.put_u32_le(key.as_bytes().len() as u32);
-        bytes.put_slice(key.as_bytes());
-        bytes.put_u8(value.kind().as_code());
-        #[allow(clippy::cast_possible_truncation)]
-        bytes.put_u32_le(value.as_bytes().len() as u32);
-        bytes.put_slice(value.as_bytes());
-    }
-
-    bytes.freeze()
-}
-
-pub fn user_headers_from_bytes(bytes: Bytes) -> Result<HashMap<HeaderKey, 
HeaderValue>, IggyError> {
-    if bytes.is_empty() {
-        return Ok(HashMap::new());
-    }
-
-    let mut headers = HashMap::new();
-    let mut position = 0;
-    while position < bytes.len() {
-        let key_kind = HeaderKind::from_code(bytes[position])?;
-        position += 1;
-
-        if position + 4 > bytes.len() {
-            return Err(IggyError::InvalidHeaderKey);
-        }
-        let key_length = u32::from_le_bytes(
-            bytes[position..position + 4]
-                .try_into()
-                .map_err(|_| IggyError::InvalidNumberEncoding)?,
-        ) as usize;
-        if key_length == 0 || key_length > 255 {
-            return Err(IggyError::InvalidHeaderKey);
-        }
-        position += 4;
-
-        if position + key_length > bytes.len() {
-            return Err(IggyError::InvalidHeaderKey);
-        }
-        if let Some(expected) = key_kind.expected_size()
-            && key_length != expected
-        {
-            return Err(IggyError::InvalidHeaderKey);
-        }
-        let key_value = bytes[position..position + key_length].to_vec();
-        position += key_length;
-
-        if position >= bytes.len() {
-            return Err(IggyError::InvalidHeaderValue);
-        }
-        let value_kind = HeaderKind::from_code(bytes[position])?;
-        position += 1;
-
-        if position + 4 > bytes.len() {
-            return Err(IggyError::InvalidHeaderValue);
-        }
-        let value_length = u32::from_le_bytes(
-            bytes[position..position + 4]
-                .try_into()
-                .map_err(|_| IggyError::InvalidNumberEncoding)?,
-        ) as usize;
-        if value_length == 0 || value_length > 255 {
-            return Err(IggyError::InvalidHeaderValue);
-        }
-        position += 4;
-
-        if position + value_length > bytes.len() {
-            return Err(IggyError::InvalidHeaderValue);
-        }
-        if let Some(expected) = value_kind.expected_size()
-            && value_length != expected
-        {
-            return Err(IggyError::InvalidHeaderValue);
-        }
-        let value_value = bytes[position..position + value_length].to_vec();
-        position += value_length;
-
-        headers.insert(
-            HeaderKey::new_unchecked(key_kind, &key_value),
-            HeaderValue::new_unchecked(value_kind, &value_value),
-        );
-    }
-
-    Ok(headers)
-}
-
-pub fn get_user_headers_size(headers: &Option<HashMap<HeaderKey, 
HeaderValue>>) -> Option<u32> {
-    let mut size = 0;
-    if let Some(headers) = headers {
-        for (key, value) in headers {
-            size += 1 + 4 + key.as_bytes().len() as u32 + 1 + 4 + 
value.as_bytes().len() as u32;
-        }
-    }
-    Some(size)
-}
-
 #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
 pub struct HeaderEntry {
     pub key: HeaderKey,
@@ -1080,6 +1000,9 @@ where
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::wire_conversions::{user_headers_from_wire, 
user_headers_to_wire};
+    use bytes::{BufMut, BytesMut};
+    use iggy_binary_protocol::WireUserHeaders;
 
     #[test]
     fn header_key_should_be_created_for_valid_value() {
@@ -1356,7 +1279,7 @@ mod tests {
 
     #[test]
     fn should_be_serialized_as_bytes() {
-        let mut headers = HashMap::new();
+        let mut headers = BTreeMap::new();
         headers.insert(
             HeaderKey::try_from("key-1").unwrap(),
             HeaderValue::from_str("Value 1").unwrap(),
@@ -1364,7 +1287,7 @@ mod tests {
         headers.insert(HeaderKey::try_from("key 1").unwrap(), 12345u64.into());
         headers.insert(HeaderKey::try_from("key_3").unwrap(), true.into());
 
-        let bytes = user_headers_to_bytes(&headers);
+        let bytes = user_headers_to_wire(&headers).into_bytes();
 
         let mut position = 0;
         let mut headers_count = 0;
@@ -1403,7 +1326,7 @@ mod tests {
 
     #[test]
     fn should_be_deserialized_from_bytes() {
-        let mut headers = HashMap::new();
+        let mut headers = BTreeMap::new();
         headers.insert(
             HeaderKey::try_from("key-1").unwrap(),
             HeaderValue::from_str("Value 1").unwrap(),
@@ -1421,7 +1344,9 @@ mod tests {
             bytes.put_slice(&value.value);
         }
 
-        let deserialized_headers = user_headers_from_bytes(bytes.freeze());
+        let frozen = bytes.freeze();
+        let wire = WireUserHeaders::from_slice(&frozen).unwrap();
+        let deserialized_headers = user_headers_from_wire(&wire);
 
         assert!(deserialized_headers.is_ok());
         let deserialized_headers = deserialized_headers.unwrap();
@@ -1438,15 +1363,16 @@ mod tests {
 
     #[test]
     fn should_serialize_and_deserialize_typed_keys() {
-        let mut headers = HashMap::new();
+        let mut headers = BTreeMap::new();
         headers.insert(
             123i32.into(),
             HeaderValue::from_str("Value for int key").unwrap(),
         );
         headers.insert(999u64.into(), true.into());
 
-        let bytes = user_headers_to_bytes(&headers);
-        let deserialized = user_headers_from_bytes(bytes).unwrap();
+        let bytes = user_headers_to_wire(&headers).into_bytes();
+        let wire = WireUserHeaders::from_slice(&bytes).unwrap();
+        let deserialized = user_headers_from_wire(&wire).unwrap();
 
         assert_eq!(deserialized.len(), headers.len());
         for (key, value) in &headers {
@@ -1655,7 +1581,7 @@ mod tests {
         bytes.put_u32_le(100); // key_len = 100 (lie!)
         bytes.put_slice(b"abc"); // only 3 bytes of key data
 
-        let result = user_headers_from_bytes(bytes.freeze());
+        let result = WireUserHeaders::from_slice(&bytes.freeze());
         assert!(result.is_err());
     }
 
@@ -1682,7 +1608,9 @@ mod tests {
         bytes.put_u32_le(4); // value_len = 4 (wrong, should be 2)
         bytes.put_slice(&[1, 2, 3, 4]);
 
-        let result = user_headers_from_bytes(bytes.freeze());
+        let frozen = bytes.freeze();
+        let wire = WireUserHeaders::from_slice(&frozen).unwrap();
+        let result = user_headers_from_wire(&wire);
         assert!(result.is_err());
     }
 
@@ -1697,7 +1625,7 @@ mod tests {
         bytes.put_u8(6); // value_kind = Int32
         // Missing value length bytes
 
-        let result = user_headers_from_bytes(bytes.freeze());
+        let result = WireUserHeaders::from_slice(&bytes.freeze());
         assert!(result.is_err());
     }
 
diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs
index 24608cb62..621f4e3bd 100644
--- a/core/common/src/utils/crypto.rs
+++ b/core/common/src/utils/crypto.rs
@@ -130,12 +130,12 @@ mod tests {
     #[test]
     fn message_payload_and_headers_should_encrypt_and_decrypt_correctly() {
         use bytes::Bytes;
-        use std::collections::HashMap;
+        use std::collections::BTreeMap;
 
         let key = [1; 32];
         let encryptor = Aes256GcmEncryptor::new(&key).unwrap();
 
-        let mut headers = HashMap::new();
+        let mut headers = BTreeMap::new();
         headers.insert(
             HeaderKey::try_from("batch").unwrap(),
             HeaderValue::from(1u64),
diff --git a/core/common/src/wire_conversions.rs 
b/core/common/src/wire_conversions.rs
index fc2cc2333..35dfcfe9e 100644
--- a/core/common/src/wire_conversions.rs
+++ b/core/common/src/wire_conversions.rs
@@ -26,10 +26,10 @@ use crate::{
     CacheMetrics, CacheMetricsKey, ClientInfo, ClientInfoDetails, 
ClusterMetadata, ClusterNode,
     ClusterNodeRole, ClusterNodeStatus, CompressionAlgorithm, Consumer, 
ConsumerGroup,
     ConsumerGroupDetails, ConsumerGroupInfo, ConsumerGroupMember, 
ConsumerOffsetInfo,
-    GlobalPermissions, IdKind, IdentityInfo, IggyByteSize, IggyError, 
IggyExpiry, MaxTopicSize,
-    Partition, Permissions, PersonalAccessTokenInfo, RawPersonalAccessToken, 
Stats, Stream,
-    StreamDetails, StreamPermissions, Topic, TopicDetails, TopicPermissions, 
TransportEndpoints,
-    UserInfo, UserInfoDetails, UserStatus,
+    GlobalPermissions, HeaderKey, HeaderKind, HeaderValue, IdKind, 
IdentityInfo, IggyByteSize,
+    IggyError, IggyExpiry, MaxTopicSize, Partition, Permissions, 
PersonalAccessTokenInfo,
+    RawPersonalAccessToken, Stats, Stream, StreamDetails, StreamPermissions, 
Topic, TopicDetails,
+    TopicPermissions, TransportEndpoints, UserInfo, UserInfoDetails, 
UserStatus,
 };
 use iggy_binary_protocol::WireConsumer;
 use iggy_binary_protocol::primitives::permissions::{
@@ -716,3 +716,70 @@ fn topic_permissions_to_wire(topic_id: usize, tp: 
&TopicPermissions) -> WireTopi
         send_messages: tp.send_messages,
     }
 }
+
+// -- User Headers conversions --
+
+/// Encode domain user headers into a [`WireUserHeaders`] wrapper.
+pub fn user_headers_to_wire(
+    headers: &BTreeMap<HeaderKey, HeaderValue>,
+) -> iggy_binary_protocol::WireUserHeaders {
+    use bytes::{BufMut, BytesMut};
+    use iggy_binary_protocol::WireUserHeaders;
+
+    if headers.is_empty() {
+        return WireUserHeaders::empty();
+    }
+    let size: usize = headers
+        .iter()
+        .map(|(k, v)| 1 + 4 + k.as_bytes().len() + 1 + 4 + v.as_bytes().len())
+        .sum();
+    let mut buf = BytesMut::with_capacity(size);
+    for (key, value) in headers {
+        buf.put_u8(key.kind().as_code());
+        #[allow(clippy::cast_possible_truncation)]
+        buf.put_u32_le(key.as_bytes().len() as u32);
+        buf.put_slice(key.as_bytes());
+        buf.put_u8(value.kind().as_code());
+        #[allow(clippy::cast_possible_truncation)]
+        buf.put_u32_le(value.as_bytes().len() as u32);
+        buf.put_slice(value.as_bytes());
+    }
+    // Buffer was just encoded from valid HeaderKey/HeaderValue entries,
+    // so structural TLV validity is guaranteed by construction.
+    WireUserHeaders::from_validated(buf.freeze())
+}
+
+/// Decode a [`WireUserHeaders`] wrapper into domain user headers.
+///
+/// Wire-level validation accepts unknown kind codes for forward compatibility
+/// (VSR rolling upgrades). Domain-level `from_code()` rejects them - the wire
+/// layer stores structurally valid data, the domain layer requires known 
semantics.
+pub fn user_headers_from_wire(
+    wire: &iggy_binary_protocol::WireUserHeaders,
+) -> Result<BTreeMap<HeaderKey, HeaderValue>, IggyError> {
+    if wire.is_empty() {
+        return Ok(BTreeMap::new());
+    }
+    let mut headers = BTreeMap::new();
+    for entry in wire.iter() {
+        let key_kind = HeaderKind::from_code(entry.key_kind.0)?;
+        if let Some(expected) = key_kind.expected_size()
+            && entry.key.len() != expected
+        {
+            return Err(IggyError::InvalidHeaderKey);
+        }
+
+        let value_kind = HeaderKind::from_code(entry.value_kind.0)?;
+        if let Some(expected) = value_kind.expected_size()
+            && entry.value.len() != expected
+        {
+            return Err(IggyError::InvalidHeaderValue);
+        }
+
+        headers.insert(
+            HeaderKey::new_unchecked(key_kind, entry.key),
+            HeaderValue::new_unchecked(value_kind, entry.value),
+        );
+    }
+    Ok(headers)
+}
diff --git a/core/connectors/runtime/src/sink.rs 
b/core/connectors/runtime/src/sink.rs
index 48961aca6..b6c8965d3 100644
--- a/core/connectors/runtime/src/sink.rs
+++ b/core/connectors/runtime/src/sink.rs
@@ -452,7 +452,7 @@ async fn process_messages(
         checksum: message.header.checksum,
         timestamp: message.header.timestamp,
         origin_timestamp: message.header.origin_timestamp,
-        headers: message.user_headers_map().unwrap_or_default(),
+        headers: message.user_headers_map().unwrap_or(None),
         payload: message.payload.into(),
     });
 
diff --git a/core/connectors/runtime/src/source.rs 
b/core/connectors/runtime/src/source.rs
index 51ec06026..c4af20450 100644
--- a/core/connectors/runtime/src/source.rs
+++ b/core/connectors/runtime/src/source.rs
@@ -30,7 +30,7 @@ use iggy_connector_sdk::{
 };
 use once_cell::sync::Lazy;
 use std::{
-    collections::HashMap,
+    collections::{BTreeMap, HashMap},
     str::FromStr,
     sync::{Arc, atomic::Ordering},
 };
@@ -567,7 +567,7 @@ pub(crate) extern "C" fn handle_produced_messages(
 fn build_iggy_message(
     payload: Vec<u8>,
     id: Option<u128>,
-    headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    headers: Option<BTreeMap<HeaderKey, HeaderValue>>,
 ) -> Result<IggyMessage, IggyError> {
     match (id, headers) {
         (Some(id), Some(h)) => IggyMessage::builder()
diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs
index 9a372de10..a864407be 100644
--- a/core/connectors/sdk/src/lib.rs
+++ b/core/connectors/sdk/src/lib.rs
@@ -31,7 +31,7 @@ use iggy::prelude::{HeaderKey, HeaderValue};
 use once_cell::sync::OnceCell;
 use prost::Message;
 use serde::{Deserialize, Serialize};
-use std::{collections::HashMap, sync::Arc};
+use std::{collections::BTreeMap, sync::Arc};
 use strum_macros::{Display, IntoStaticStr};
 use thiserror::Error;
 use tokio::runtime::Runtime;
@@ -292,7 +292,7 @@ pub struct ReceivedMessage {
     pub checksum: u64,
     pub timestamp: u64,
     pub origin_timestamp: u64,
-    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub headers: Option<BTreeMap<HeaderKey, HeaderValue>>,
     pub payload: Vec<u8>,
 }
 
@@ -311,7 +311,7 @@ pub struct ProducedMessage {
     pub checksum: Option<u64>,
     pub timestamp: Option<u64>,
     pub origin_timestamp: Option<u64>,
-    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub headers: Option<BTreeMap<HeaderKey, HeaderValue>>,
     pub payload: Vec<u8>,
 }
 
@@ -323,7 +323,7 @@ pub struct DecodedMessage {
     pub checksum: Option<u64>,
     pub timestamp: Option<u64>,
     pub origin_timestamp: Option<u64>,
-    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub headers: Option<BTreeMap<HeaderKey, HeaderValue>>,
     pub payload: Payload,
 }
 
@@ -354,7 +354,7 @@ pub struct ConsumedMessage {
     pub checksum: u64,
     pub timestamp: u64,
     pub origin_timestamp: u64,
-    pub headers: Option<HashMap<HeaderKey, HeaderValue>>,
+    pub headers: Option<BTreeMap<HeaderKey, HeaderValue>>,
     pub payload: Payload,
 }
 
diff --git a/core/connectors/sinks/http_sink/src/lib.rs 
b/core/connectors/sinks/http_sink/src/lib.rs
index 03bc5ba44..6f86885b5 100644
--- a/core/connectors/sinks/http_sink/src/lib.rs
+++ b/core/connectors/sinks/http_sink/src/lib.rs
@@ -1259,6 +1259,7 @@ impl Sink for HttpSink {
 mod tests {
     use super::*;
     use iggy_connector_sdk::Schema;
+    use std::collections::BTreeMap;
 
     const FIELD_DATA: &str = "data";
     const FIELD_PAYLOAD_ENCODING: &str = "iggy_payload_encoding";
@@ -1651,7 +1652,7 @@ mod tests {
         let topic_meta = given_topic_metadata();
         let msg_meta = given_messages_metadata();
 
-        let mut headers = HashMap::new();
+        let mut headers = BTreeMap::new();
         headers.insert(
             "x-correlation-id".parse().unwrap(),
             "abc-123".parse().unwrap(),
diff --git a/core/integration/tests/cli/message/test_message_poll_command.rs 
b/core/integration/tests/cli/message/test_message_poll_command.rs
index 7d7532912..0ed93d217 100644
--- a/core/integration/tests/cli/message/test_message_poll_command.rs
+++ b/core/integration/tests/cli/message/test_message_poll_command.rs
@@ -25,7 +25,7 @@ use bytes::Bytes;
 use iggy::prelude::*;
 use predicates::str::{contains, starts_with};
 use serial_test::parallel;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use std::str::FromStr;
 
 struct TestMessagePollCmd {
@@ -143,7 +143,7 @@ impl IggyCmdTestCase for TestMessagePollCmd {
                 let payload = Bytes::from(s.as_bytes().to_vec());
                 IggyMessage::builder()
                     .payload(payload)
-                    .user_headers(HashMap::from([self.headers.clone()]))
+                    .user_headers(BTreeMap::from([self.headers.clone()]))
                     .build()
                     .expect("Failed to create message with headers")
             })
diff --git 
a/core/integration/tests/cli/message/test_message_poll_to_file_command.rs 
b/core/integration/tests/cli/message/test_message_poll_to_file_command.rs
index 3f9d029e5..842b6f963 100644
--- a/core/integration/tests/cli/message/test_message_poll_to_file_command.rs
+++ b/core/integration/tests/cli/message/test_message_poll_to_file_command.rs
@@ -23,7 +23,7 @@ use bytes::Bytes;
 use iggy::prelude::*;
 use predicates::str::{contains, is_match, starts_with};
 use serial_test::parallel;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use std::path::Path;
 use std::str::FromStr;
 
@@ -33,7 +33,7 @@ pub(super) struct TestMessagePollToFileCmd<'a> {
     messages: Vec<&'a str>,
     message_count: usize,
     strategy: PollingStrategy,
-    headers: HashMap<HeaderKey, HeaderValue>,
+    headers: BTreeMap<HeaderKey, HeaderValue>,
     output_file: String,
     cleanup: bool,
     // These will be populated after creating the resources
@@ -49,7 +49,7 @@ impl<'a> TestMessagePollToFileCmd<'a> {
         messages: &[&'a str],
         message_count: usize,
         strategy: PollingStrategy,
-        headers: HashMap<HeaderKey, HeaderValue>,
+        headers: BTreeMap<HeaderKey, HeaderValue>,
         output_file: &str,
         cleanup: bool,
     ) -> Self {
@@ -240,7 +240,7 @@ pub async fn should_be_successful() {
         "accusantium doloremque laudantium, totam rem aperiam, eaque ipsa",
     ];
 
-    let test_headers = HashMap::from([
+    let test_headers = BTreeMap::from([
         (
             HeaderKey::from_str("HeaderKey1").unwrap(),
             HeaderValue::from_str("HeaderValue1").unwrap(),
diff --git a/core/integration/tests/cli/message/test_message_reply_via_file.rs 
b/core/integration/tests/cli/message/test_message_reply_via_file.rs
index f4cbd6c6e..d65518c36 100644
--- a/core/integration/tests/cli/message/test_message_reply_via_file.rs
+++ b/core/integration/tests/cli/message/test_message_reply_via_file.rs
@@ -21,7 +21,7 @@ use 
crate::cli::message::test_message_poll_to_file_command::TestMessagePollToFil
 use 
crate::cli::message::test_message_send_from_file_command::TestMessageSendFromFileCmd;
 use iggy::prelude::*;
 use serial_test::parallel;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use std::str::FromStr;
 
 #[tokio::test]
@@ -42,7 +42,7 @@ pub async fn should_be_successful() {
         "accusantium doloremque laudantium, totam rem aperiam, eaque ipsa",
     ];
 
-    let test_headers = HashMap::from([
+    let test_headers = BTreeMap::from([
         (
             HeaderKey::from_str("HeaderKey1").unwrap(),
             HeaderValue::from_str("HeaderValue1").unwrap(),
diff --git a/core/integration/tests/cli/message/test_message_send_command.rs 
b/core/integration/tests/cli/message/test_message_send_command.rs
index 97ed4fe66..d6037446a 100644
--- a/core/integration/tests/cli/message/test_message_send_command.rs
+++ b/core/integration/tests/cli/message/test_message_send_command.rs
@@ -24,7 +24,7 @@ use async_trait::async_trait;
 use iggy::prelude::*;
 use predicates::str::diff;
 use serial_test::parallel;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use std::str::from_utf8;
 use twox_hash::XxHash32;
 
@@ -58,7 +58,7 @@ struct TestMessageSendCmd {
     messages: Vec<String>,
     message_input: ProvideMessages,
     partitioning: PartitionSelection,
-    header: Option<HashMap<HeaderKey, HeaderValue>>,
+    header: Option<BTreeMap<HeaderKey, HeaderValue>>,
     // These will be populated after creating the resources
     actual_stream_id: Option<u32>,
     actual_topic_id: Option<u32>,
@@ -72,7 +72,7 @@ impl TestMessageSendCmd {
         messages: Vec<String>,
         message_input: ProvideMessages,
         partitioning: PartitionSelection,
-        header: Option<HashMap<HeaderKey, HeaderValue>>,
+        header: Option<BTreeMap<HeaderKey, HeaderValue>>,
     ) -> Self {
         Self {
             stream_name,
@@ -345,7 +345,7 @@ pub async fn should_be_successful() {
                 ],
                 message_input,
                 using_partitioning,
-                Some(HashMap::from([
+                Some(BTreeMap::from([
                     (
                         HeaderKey::try_from("key1").unwrap(),
                         HeaderValue::try_from("value1").unwrap(),
diff --git 
a/core/integration/tests/cli/message/test_message_send_from_file_command.rs 
b/core/integration/tests/cli/message/test_message_send_from_file_command.rs
index 1089bd7ff..eb45d5487 100644
--- a/core/integration/tests/cli/message/test_message_send_from_file_command.rs
+++ b/core/integration/tests/cli/message/test_message_send_from_file_command.rs
@@ -23,7 +23,7 @@ use bytes::Bytes;
 use iggy::prelude::*;
 use predicates::str::{ends_with, is_match, starts_with};
 use serial_test::parallel;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use std::str::FromStr;
 use tokio::io::AsyncWriteExt;
 
@@ -34,7 +34,7 @@ pub(super) struct TestMessageSendFromFileCmd<'a> {
     topic_name: String,
     messages: Vec<&'a str>,
     message_count: usize,
-    headers: HashMap<HeaderKey, HeaderValue>,
+    headers: BTreeMap<HeaderKey, HeaderValue>,
     // These will be populated after creating the resources
     actual_stream_id: Option<u32>,
     actual_topic_id: Option<u32>,
@@ -49,7 +49,7 @@ impl<'a> TestMessageSendFromFileCmd<'a> {
         topic_name: &str,
         messages: &[&'a str],
         message_count: usize,
-        headers: HashMap<HeaderKey, HeaderValue>,
+        headers: BTreeMap<HeaderKey, HeaderValue>,
     ) -> Self {
         assert!(message_count <= messages.len());
         Self {
@@ -273,7 +273,7 @@ pub async fn should_be_successful() {
         "accusantium doloremque laudantium, totam rem aperiam, eaque ipsa",
     ];
 
-    let test_headers = HashMap::from([
+    let test_headers = BTreeMap::from([
         (
             HeaderKey::from_str("HeaderKey1").unwrap(),
             HeaderValue::from_str("HeaderValue1").unwrap(),
diff --git a/core/integration/tests/server/scenarios/create_message_payload.rs 
b/core/integration/tests/server/scenarios/create_message_payload.rs
index f44a67333..d19c7aee4 100644
--- a/core/integration/tests/server/scenarios/create_message_payload.rs
+++ b/core/integration/tests/server/scenarios/create_message_payload.rs
@@ -19,7 +19,7 @@
 use bytes::Bytes;
 use iggy::prelude::*;
 use integration::harness::{TestHarness, assert_clean_system};
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 
 const STREAM_NAME: &str = "test-stream";
 const TOPIC_NAME: &str = "test-topic";
@@ -137,8 +137,8 @@ fn create_message_payload(offset: u64) -> Bytes {
     Bytes::from(format!("message {offset}"))
 }
 
-fn create_message_headers() -> HashMap<HeaderKey, HeaderValue> {
-    let mut headers = HashMap::new();
+fn create_message_headers() -> BTreeMap<HeaderKey, HeaderValue> {
+    let mut headers = BTreeMap::new();
     headers.insert(
         HeaderKey::try_from("key_1").unwrap(),
         HeaderValue::try_from("Value 1").unwrap(),
diff --git a/core/integration/tests/server/scenarios/encryption_scenario.rs 
b/core/integration/tests/server/scenarios/encryption_scenario.rs
index 6881f5f8c..8778e8fbd 100644
--- a/core/integration/tests/server/scenarios/encryption_scenario.rs
+++ b/core/integration/tests/server/scenarios/encryption_scenario.rs
@@ -21,7 +21,7 @@ use iggy::prelude::*;
 use iggy_common::TransportProtocol;
 use integration::harness::{TestHarness, TestServerConfig};
 use serial_test::parallel;
-use std::collections::HashMap;
+use std::collections::{BTreeMap, HashMap};
 use std::path::{Path, PathBuf};
 use std::sync::Arc;
 use test_case::test_matrix;
@@ -63,7 +63,7 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
     let mut messages_batch_1 = Vec::new();
 
     for i in 0..messages_per_batch {
-        let mut headers = HashMap::new();
+        let mut headers = BTreeMap::new();
         headers.insert(HeaderKey::try_from("batch").unwrap(), 1u64.into());
         headers.insert(HeaderKey::try_from("index").unwrap(), i.into());
         headers.insert(
@@ -217,7 +217,7 @@ async fn 
should_fill_data_with_headers_and_verify_after_restart_using_api(encryp
     let mut messages_batch_2 = Vec::new();
 
     for i in 0..messages_per_batch {
-        let mut headers = HashMap::new();
+        let mut headers = BTreeMap::new();
         headers.insert(HeaderKey::try_from("batch").unwrap(), 2u64.into());
         headers.insert(HeaderKey::try_from("index").unwrap(), i.into());
         headers.insert(
@@ -384,7 +384,7 @@ async fn 
should_encrypt_and_decrypt_headers_with_client_side_encryption(
 
     let mut messages = Vec::new();
     for i in 0..10i64 {
-        let mut headers = HashMap::new();
+        let mut headers = BTreeMap::new();
         headers.insert(HeaderKey::try_from("index").unwrap(), 
HeaderValue::from(i));
         headers.insert(
             HeaderKey::try_from("transport").unwrap(),
diff --git 
a/core/integration/tests/server/scenarios/message_headers_scenario.rs 
b/core/integration/tests/server/scenarios/message_headers_scenario.rs
index 714c56247..057572407 100644
--- a/core/integration/tests/server/scenarios/message_headers_scenario.rs
+++ b/core/integration/tests/server/scenarios/message_headers_scenario.rs
@@ -22,7 +22,7 @@ use crate::server::scenarios::{
 use bytes::Bytes;
 use iggy::prelude::*;
 use integration::harness::{TestHarness, assert_clean_system};
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 
 pub async fn run(harness: &TestHarness) {
     let client = harness
@@ -129,8 +129,8 @@ fn create_message_payload(offset: u64) -> Bytes {
     Bytes::from(format!("message {offset}"))
 }
 
-fn create_message_headers() -> HashMap<HeaderKey, HeaderValue> {
-    let mut headers = HashMap::new();
+fn create_message_headers() -> BTreeMap<HeaderKey, HeaderValue> {
+    let mut headers = BTreeMap::new();
     headers.insert(
         HeaderKey::try_from("key_1").unwrap(),
         HeaderValue::try_from("Value 1").unwrap(),
diff --git a/core/integration/tests/server/scenarios/message_size_scenario.rs 
b/core/integration/tests/server/scenarios/message_size_scenario.rs
index 14b240d6a..fb047ef2c 100644
--- a/core/integration/tests/server/scenarios/message_size_scenario.rs
+++ b/core/integration/tests/server/scenarios/message_size_scenario.rs
@@ -19,7 +19,7 @@
 use bytes::Bytes;
 use iggy::prelude::*;
 use integration::harness::{TestHarness, assert_clean_system};
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 
 const STREAM_NAME: &str = "test-stream";
 const TOPIC_NAME: &str = "test-topic";
@@ -199,8 +199,8 @@ fn create_string_of_size(size: usize) -> String {
     "x".repeat(size)
 }
 
-fn create_message_header_of_size(target_size: usize) -> HashMap<HeaderKey, 
HeaderValue> {
-    let mut headers = HashMap::new();
+fn create_message_header_of_size(target_size: usize) -> BTreeMap<HeaderKey, 
HeaderValue> {
+    let mut headers = BTreeMap::new();
     let mut current_size = 0;
     let mut header_id: u32 = 0;
 
diff --git a/core/integration/tests/server/scenarios/offset_scenario.rs 
b/core/integration/tests/server/scenarios/offset_scenario.rs
index 282e355fa..15003a06b 100644
--- a/core/integration/tests/server/scenarios/offset_scenario.rs
+++ b/core/integration/tests/server/scenarios/offset_scenario.rs
@@ -19,9 +19,10 @@
 use super::PARTITION_ID;
 use bytes::BytesMut;
 use iggy::prelude::*;
-use iggy_common::user_headers_from_bytes;
+use iggy_binary_protocol::WireUserHeaders;
+use iggy_common::wire_conversions::user_headers_from_wire;
 use integration::harness::TestHarness;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 
 fn small_batches() -> Vec<u32> {
     vec![3, 4, 5, 6, 7]
@@ -173,7 +174,7 @@ fn create_single_message(id: u32, message_size: u64) -> 
IggyMessage {
     payload.resize(message_size as usize, 0xD);
     let payload = payload.freeze();
 
-    let mut headers = HashMap::new();
+    let mut headers = BTreeMap::new();
     headers.insert(
         HeaderKey::try_from("key_1").unwrap(),
         HeaderValue::try_from("Value 1").unwrap(),
@@ -404,7 +405,8 @@ async fn verify_message_content(
             );
 
             if let Some(headers) = &msg.user_headers {
-                let headers_map = 
user_headers_from_bytes(headers.clone()).unwrap();
+                let headers_map =
+                    
user_headers_from_wire(&WireUserHeaders::from_slice(headers).unwrap()).unwrap();
                 assert_eq!(headers_map.len(), 3, "Expected 3 headers");
             }
         }
diff --git a/core/integration/tests/server/scenarios/timestamp_scenario.rs 
b/core/integration/tests/server/scenarios/timestamp_scenario.rs
index 661970dd6..41b3be957 100644
--- a/core/integration/tests/server/scenarios/timestamp_scenario.rs
+++ b/core/integration/tests/server/scenarios/timestamp_scenario.rs
@@ -19,9 +19,10 @@
 use super::PARTITION_ID;
 use bytes::BytesMut;
 use iggy::prelude::*;
-use iggy_common::user_headers_from_bytes;
+use iggy_binary_protocol::WireUserHeaders;
+use iggy_common::wire_conversions::user_headers_from_wire;
 use integration::harness::TestHarness;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use tokio::time::{Duration, sleep};
 
 fn small_batches() -> Vec<u32> {
@@ -197,7 +198,7 @@ fn create_single_message(id: u32, message_size: u64) -> 
IggyMessage {
     payload.resize(message_size as usize, 0xD);
     let payload = payload.freeze();
 
-    let mut headers = HashMap::new();
+    let mut headers = BTreeMap::new();
     headers.insert(
         HeaderKey::try_from("key_1").unwrap(),
         HeaderValue::try_from("Value 1").unwrap(),
@@ -428,7 +429,8 @@ async fn verify_message_content_by_timestamp(
             );
 
             if let Some(headers) = &msg.user_headers {
-                let headers_map = 
user_headers_from_bytes(headers.clone()).unwrap();
+                let headers_map =
+                    
user_headers_from_wire(&WireUserHeaders::from_slice(headers).unwrap()).unwrap();
                 assert_eq!(headers_map.len(), 3, "Expected 3 headers");
             }
         }
diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs
index 72e426ce5..3cb66f07a 100644
--- a/core/sdk/src/prelude.rs
+++ b/core/sdk/src/prelude.rs
@@ -63,7 +63,6 @@ pub use iggy_common::{
     TcpClientReconnectionConfig, Topic, TopicDetails, TopicPermissions, 
TransportEndpoints,
     TransportProtocol, UserId, UserStatus, Validatable, WebSocketClientConfig,
     WebSocketClientConfigBuilder, WebSocketClientReconnectionConfig, defaults, 
locking,
-    user_headers_from_bytes, user_headers_to_bytes,
 };
 pub use iggy_common::{
     Client, ClusterClient, ConsumerGroupClient, ConsumerOffsetClient, 
MessageClient,
diff --git a/core/tools/src/data-seeder/seeder.rs 
b/core/tools/src/data-seeder/seeder.rs
index 8a047bda1..0d60ef4ab 100644
--- a/core/tools/src/data-seeder/seeder.rs
+++ b/core/tools/src/data-seeder/seeder.rs
@@ -18,7 +18,7 @@
 
 use iggy::prelude::*;
 use rand::RngExt;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 
 const PROD_STREAM_NAME: &str = "prod";
 const TEST_STREAM_NAME: &str = "test";
@@ -155,7 +155,7 @@ async fn send_messages(client: &IggyClient, streams: 
&[(String, u32)]) -> Result
                     let headers = match rng.random_bool(0.5) {
                         false => None,
                         true => {
-                            let mut headers = HashMap::new();
+                            let mut headers = BTreeMap::new();
                             headers.insert(
                                 HeaderKey::try_from("key 1")?,
                                 HeaderValue::try_from("value1")?,
diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml
index 0645f553b..67005fd50 100644
--- a/examples/rust/Cargo.toml
+++ b/examples/rust/Cargo.toml
@@ -28,6 +28,7 @@ chrono = { workspace = true }
 clap = { workspace = true }
 futures-util = { workspace = true }
 iggy = { workspace = true }
+iggy_common = { workspace = true }
 lz4_flex = { workspace = true }
 rand = { workspace = true }
 serde = { workspace = true }
diff --git 
a/examples/rust/src/message-headers/message-compression/consumer/main.rs 
b/examples/rust/src/message-headers/message-compression/consumer/main.rs
index 3dc14d308..32e6050a4 100644
--- a/examples/rust/src/message-headers/message-compression/consumer/main.rs
+++ b/examples/rust/src/message-headers/message-compression/consumer/main.rs
@@ -19,6 +19,7 @@
 use bytes::Bytes;
 use futures_util::stream::StreamExt;
 use iggy::prelude::*;
+use iggy_common::wire_conversions::user_headers_to_wire;
 // The compression and decompression utilities are shared between the producer 
and consumer compression examples.
 // Hence, we import them here.
 use iggy_examples::shared::codec::{Codec, NUM_MESSAGES, STREAM_NAME, 
TOPIC_NAME};
@@ -76,12 +77,13 @@ fn handle_payload_compression(msg: &mut ReceivedMessage) -> 
Result<(), IggyError
         // remove the compression header since payload is now decompressed
         if let Ok(Some(mut headers_map)) = msg.message.user_headers_map() {
             headers_map.remove(&Codec::header_key());
-            let headers_bytes = user_headers_to_bytes(&headers_map);
-            msg.message.header.user_headers_length = headers_bytes.len() as 
u32;
-            msg.message.user_headers = if headers_map.is_empty() {
-                None
+            if headers_map.is_empty() {
+                msg.message.header.user_headers_length = 0;
+                msg.message.user_headers = None;
             } else {
-                Some(headers_bytes)
+                let wire = user_headers_to_wire(&headers_map);
+                msg.message.header.user_headers_length = wire.as_bytes().len() 
as u32;
+                msg.message.user_headers = Some(wire.into_bytes());
             };
         }
     }
diff --git 
a/examples/rust/src/message-headers/message-compression/producer/main.rs 
b/examples/rust/src/message-headers/message-compression/producer/main.rs
index 5c258b314..ed06a0626 100644
--- a/examples/rust/src/message-headers/message-compression/producer/main.rs
+++ b/examples/rust/src/message-headers/message-compression/producer/main.rs
@@ -18,7 +18,7 @@
 
 use bytes::Bytes;
 use iggy::prelude::*;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 // The compression and decompression utilities are shared between the producer 
and consumer compression examples.
 // Hence, we import them here.
 use iggy_examples::shared::codec::{Codec, NUM_MESSAGES, STREAM_NAME, 
TOPIC_NAME};
@@ -59,7 +59,7 @@ async fn main() -> Result<(), IggyError> {
     // NOTE: This is where the Codec is used to prepare the compression 
user-header for the IggyMessage.
     let key = Codec::header_key();
     let value = codec.to_header_value();
-    let compression_headers = HashMap::from([(key, value)]);
+    let compression_headers = BTreeMap::from([(key, value)]);
 
     // Generate artificial example messages to send to the server.
     let mut messages = Vec::new();
diff --git a/examples/rust/src/message-headers/message-type/producer/main.rs 
b/examples/rust/src/message-headers/message-type/producer/main.rs
index f394fddfe..85596106f 100644
--- a/examples/rust/src/message-headers/message-type/producer/main.rs
+++ b/examples/rust/src/message-headers/message-type/producer/main.rs
@@ -22,7 +22,7 @@ use iggy::prelude::*;
 use iggy_examples::shared::args::Args;
 use iggy_examples::shared::messages_generator::MessagesGenerator;
 use iggy_examples::shared::system;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use std::error::Error;
 use std::sync::Arc;
 use tracing::info;
@@ -83,7 +83,7 @@ async fn produce_messages(args: &Args, client: &IggyClient) 
-> Result<(), Box<dy
             let json = serializable_message.to_json();
 
             // The message type will be stored in the custom message header.
-            let mut headers = HashMap::new();
+            let mut headers = BTreeMap::new();
             headers.insert(
                 HeaderKey::try_from("message_type").unwrap(),
                 HeaderValue::try_from(message_type).unwrap(),
diff --git a/examples/rust/src/message-headers/typed-headers/producer/main.rs 
b/examples/rust/src/message-headers/typed-headers/producer/main.rs
index 3e6d3c5a8..a9a36b730 100644
--- a/examples/rust/src/message-headers/typed-headers/producer/main.rs
+++ b/examples/rust/src/message-headers/typed-headers/producer/main.rs
@@ -21,7 +21,7 @@ use bytes::Bytes;
 use iggy::prelude::*;
 use iggy_examples::shared::args::Args;
 use iggy_examples::shared::system;
-use std::collections::HashMap;
+use std::collections::BTreeMap;
 use std::error::Error;
 use std::sync::Arc;
 use tracing::info;
@@ -78,7 +78,7 @@ async fn produce_messages(args: &Args, client: &IggyClient) 
-> Result<(), Box<dy
         for _ in 0..args.messages_per_batch {
             message_id += 1;
 
-            let mut headers = HashMap::new();
+            let mut headers = BTreeMap::new();
             headers.insert(
                 HeaderKey::try_from("event_type")?,
                 HeaderValue::try_from("user_action")?,


Reply via email to