This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch refactor-binary-7-http in repository https://gitbox.apache.org/repos/asf/iggy.git
commit e409be00a6ccd5604a5d047f9560c5f91212ca09 Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Mar 31 12:52:22 2026 +0200 refactor(server,sdk): add wire validation for user headers and fix encoding defects User headers had several defects: no structural validation on server ingest (garbage bytes were stored and only discovered at consumer read time as silent Ok(None)), non-deterministic encoding from HashMap iteration order affecting the HTTP path, wrong error type (IggyError::InvalidCommand for unknown header kind), double heap allocation per field in the decoder, and zero pre-allocation in the encoder. Add wire-level TLV validation primitives to binary_protocol: WireHeaderKind(u8) newtype (forward-compatible for VSR rolling upgrades), validate_user_headers() for structural checks, WireUserHeaderIterator for zero-copy iteration, and encode/size helpers. The server now validates header structure on ingest before persistence. Migrate UserHeaders from HashMap to BTreeMap for deterministic serialization order. Rewrite user_headers_to_bytes with pre-allocated buffer via binary_protocol encoder. Rewrite user_headers_from_bytes to take &[u8] and use the zero-copy iterator, fixing the double .to_vec() allocation bug. Fix silent Ok(None) in both user_headers_map() impls to propagate errors. Replace IggyError::InvalidCommand with new InvalidHeaderKind(u8) = 4038 variant. --- Cargo.lock | 2 + DEPENDENCIES.md | 86 ++-- core/binary_protocol/src/lib.rs | 4 + core/binary_protocol/src/primitives/mod.rs | 1 + .../binary_protocol/src/primitives/user_headers.rs | 544 +++++++++++++++++++++ core/cli/Cargo.toml | 1 + .../src/commands/binary_message/poll_messages.rs | 9 +- .../src/commands/binary_message/send_messages.rs | 4 +- core/common/src/error/iggy_error.rs | 2 + core/common/src/http/messages/send_messages.rs | 4 +- core/common/src/types/message/iggy_message.rs | 72 +-- core/common/src/types/message/message_view.rs | 26 +- core/common/src/types/message/mod.rs | 3 +- core/common/src/types/message/user_headers.rs | 170 ++----- core/common/src/utils/crypto.rs | 4 +- core/common/src/wire_conversions.rs | 75 ++- core/connectors/runtime/src/sink.rs | 2 +- core/connectors/runtime/src/source.rs | 4 +- core/connectors/sdk/src/lib.rs | 10 +- core/connectors/sinks/http_sink/src/lib.rs | 3 +- .../tests/cli/message/test_message_poll_command.rs | 4 +- .../message/test_message_poll_to_file_command.rs | 8 +- .../cli/message/test_message_reply_via_file.rs | 4 +- .../tests/cli/message/test_message_send_command.rs | 8 +- .../message/test_message_send_from_file_command.rs | 8 +- .../server/scenarios/create_message_payload.rs | 6 +- .../tests/server/scenarios/encryption_scenario.rs | 8 +- .../server/scenarios/message_headers_scenario.rs | 6 +- .../server/scenarios/message_size_scenario.rs | 6 +- .../tests/server/scenarios/offset_scenario.rs | 10 +- .../tests/server/scenarios/timestamp_scenario.rs | 10 +- core/sdk/src/prelude.rs | 1 - core/tools/src/data-seeder/seeder.rs | 4 +- examples/rust/Cargo.toml | 1 + .../message-compression/consumer/main.rs | 12 +- .../message-compression/producer/main.rs | 4 +- .../message-headers/message-type/producer/main.rs | 4 +- .../message-headers/typed-headers/producer/main.rs | 4 +- 38 files changed, 848 insertions(+), 286 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 66d8271d2..b5c76c369 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5305,6 +5305,7 @@ dependencies = [ "dirs", "figlet-rs", "iggy", + "iggy_binary_protocol", "iggy_common", "keyring", "passterm", @@ -5739,6 +5740,7 @@ dependencies = [ "clap", "futures-util", "iggy", + "iggy_common", "lz4_flex 0.13.0", "rand 0.10.0", "serde", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 9b019c26b..0135e7ed2 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -80,7 +80,7 @@ av-scenechange: 0.14.1, "MIT", av1-grain: 0.2.5, "BSD-2-Clause", avif-serialize: 0.8.8, "BSD-3-Clause", aws-lc-rs: 1.16.2, "(Apache-2.0 OR ISC) AND ISC", -aws-lc-sys: 0.39.0, "(Apache-2.0 OR ISC OR MIT) AND (Apache-2.0 OR ISC OR MIT-0) AND (Apache-2.0 OR ISC) AND Apache-2.0 AND BSD-3-Clause AND ISC AND MIT", +aws-lc-sys: 0.39.1, "(Apache-2.0 OR ISC OR MIT) AND (Apache-2.0 OR ISC OR MIT-0) AND (Apache-2.0 OR ISC) AND Apache-2.0 AND BSD-3-Clause AND ISC AND MIT", axum: 0.8.8, "MIT", axum-core: 0.5.6, "MIT", axum-macros: 0.5.0, "MIT", @@ -112,7 +112,7 @@ bitflags: 2.11.0, "Apache-2.0 OR MIT", bitstream-io: 4.9.0, "Apache-2.0 OR MIT", bitvec: 1.0.1, "MIT", blake2: 0.10.6, "Apache-2.0 OR MIT", -blake3: 1.8.3, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR CC0-1.0", +blake3: 1.8.4, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR CC0-1.0", block-buffer: 0.10.4, "Apache-2.0 OR MIT", block2: 0.6.2, "MIT", bnum: 0.12.1, "Apache-2.0 OR MIT", @@ -145,7 +145,7 @@ capacity_builder: 0.5.0, "MIT", capacity_builder_macros: 0.3.0, "MIT", cargo-platform: 0.3.2, "Apache-2.0 OR MIT", cargo_metadata: 0.23.1, "MIT", -cc: 1.2.57, "Apache-2.0 OR MIT", +cc: 1.2.58, "Apache-2.0 OR MIT", cesu8: 1.1.0, "Apache-2.0 OR MIT", cexpr: 0.6.0, "Apache-2.0 OR MIT", cfg-if: 1.0.4, "Apache-2.0 OR MIT", @@ -162,7 +162,7 @@ clap_complete: 4.6.0, "Apache-2.0 OR MIT", clap_derive: 4.6.0, "Apache-2.0 OR MIT", clap_lex: 1.1.0, "Apache-2.0 OR MIT", clock: 0.1.0, "N/A", -cmake: 0.1.57, "Apache-2.0 OR MIT", +cmake: 0.1.58, "Apache-2.0 OR MIT", cobs: 0.3.0, "Apache-2.0 OR MIT", color_quant: 1.1.0, "MIT", colorchoice: 1.0.5, "Apache-2.0 OR MIT", @@ -171,13 +171,13 @@ combine: 4.6.7, "MIT", comfy-table: 7.2.2, "MIT", compio: 0.18.0, "MIT", compio-buf: 0.8.1, "MIT", -compio-driver: 0.11.3, "MIT", +compio-driver: 0.11.4, "MIT", compio-fs: 0.11.0, "MIT", compio-io: 0.9.1, "MIT", compio-log: 0.1.0, "MIT", compio-macros: 0.1.2, "MIT", compio-net: 0.11.1, "MIT", -compio-quic: 0.7.1, "MIT", +compio-quic: 0.7.2, "MIT", compio-runtime: 0.11.0, "MIT", compio-tls: 0.9.0, "MIT", compio-ws: 0.3.0, "MIT", @@ -270,7 +270,7 @@ derive_more: 2.1.1, "MIT", derive_more-impl: 2.1.1, "MIT", difflib: 0.4.0, "MIT", digest: 0.10.7, "Apache-2.0 OR MIT", -dircpy: 0.3.19, "MIT", +dircpy: 0.3.20, "MIT", dirs: 6.0.0, "Apache-2.0 OR MIT", dirs-sys: 0.5.0, "Apache-2.0 OR MIT", dispatch2: 0.3.1, "Apache-2.0 OR MIT OR Zlib", @@ -430,7 +430,7 @@ human-repr: 1.1.0, "MIT", humantime: 2.3.0, "Apache-2.0 OR MIT", hwlocality: 1.0.0-alpha.11, "MIT", hwlocality-sys: 0.6.4, "MIT", -hyper: 1.8.1, "MIT", +hyper: 1.9.0, "MIT", hyper-named-pipe: 0.1.0, "Apache-2.0", hyper-rustls: 0.27.7, "Apache-2.0 OR ISC OR MIT", hyper-timeout: 0.5.2, "Apache-2.0 OR MIT", @@ -493,13 +493,13 @@ inout: 0.1.4, "Apache-2.0 OR MIT", integer-encoding: 3.0.4, "MIT", integration: 0.0.1, "Apache-2.0", interpolate_name: 0.2.4, "MIT", -inventory: 0.3.22, "Apache-2.0 OR MIT", +inventory: 0.3.24, "Apache-2.0 OR MIT", io-uring: 0.7.11, "Apache-2.0 OR MIT", io_uring_buf_ring: 0.2.3, "MIT", iobuf: 0.1.0, "Apache-2.0", ipconfig: 0.3.4, "Apache-2.0 OR MIT", ipnet: 2.12.0, "Apache-2.0 OR MIT", -iri-string: 0.7.11, "Apache-2.0 OR MIT", +iri-string: 0.7.12, "Apache-2.0 OR MIT", is_terminal_polyfill: 1.70.2, "Apache-2.0 OR MIT", itertools: 0.13.0, "Apache-2.0 OR MIT", itertools: 0.14.0, "Apache-2.0 OR MIT", @@ -514,7 +514,7 @@ jni-sys: 0.4.1, "Apache-2.0 OR MIT", jni-sys-macros: 0.4.1, "Apache-2.0 OR MIT", jobserver: 0.1.34, "Apache-2.0 OR MIT", journal: 0.1.0, "Apache-2.0", -js-sys: 0.3.91, "Apache-2.0 OR MIT", +js-sys: 0.3.94, "Apache-2.0 OR MIT", jsonwebtoken: 10.3.0, "MIT", jwalk: 0.8.1, "MIT", keccak: 0.1.6, "Apache-2.0 OR MIT", @@ -536,7 +536,7 @@ lexical-util: 1.0.7, "Apache-2.0 OR MIT", lexical-write-float: 1.0.6, "Apache-2.0 OR MIT", lexical-write-integer: 1.0.6, "Apache-2.0 OR MIT", libbz2-rs-sys: 0.2.2, "bzip2-1.0.6", -libc: 0.2.183, "Apache-2.0 OR MIT", +libc: 0.2.184, "Apache-2.0 OR MIT", libdbus-sys: 0.2.7, "Apache-2.0 OR MIT", libfuzzer-sys: 0.4.12, "(Apache-2.0 OR MIT) AND NCSA", libgit2-sys: 0.18.3+1.9.2, "Apache-2.0 OR MIT", @@ -545,7 +545,7 @@ liblzma: 0.4.6, "Apache-2.0 OR MIT", liblzma-sys: 0.4.5, "Apache-2.0 OR MIT", libm: 0.2.16, "MIT", libmimalloc-sys: 0.1.44, "MIT", -libredox: 0.1.14, "MIT", +libredox: 0.1.15, "MIT", libsqlite3-sys: 0.30.1, "MIT", libz-sys: 1.1.25, "Apache-2.0 OR MIT", linked-hash-map: 0.5.6, "Apache-2.0 OR MIT", @@ -586,7 +586,7 @@ mime: 0.3.17, "Apache-2.0 OR MIT", mime_guess: 2.0.5, "MIT", minimal-lexical: 0.2.1, "Apache-2.0 OR MIT", miniz_oxide: 0.8.9, "Apache-2.0 OR MIT OR Zlib", -mio: 1.1.1, "MIT", +mio: 1.2.0, "MIT", mockall: 0.14.0, "Apache-2.0 OR MIT", mockall_derive: 0.14.0, "Apache-2.0 OR MIT", moka: 0.12.15, "(Apache-2.0 OR MIT) AND Apache-2.0", @@ -616,7 +616,7 @@ num: 0.4.3, "Apache-2.0 OR MIT", num-bigint: 0.4.6, "Apache-2.0 OR MIT", num-bigint-dig: 0.8.6, "Apache-2.0 OR MIT", num-complex: 0.4.6, "Apache-2.0 OR MIT", -num-conv: 0.2.0, "Apache-2.0 OR MIT", +num-conv: 0.2.1, "Apache-2.0 OR MIT", num-derive: 0.4.2, "Apache-2.0 OR MIT", num-integer: 0.1.46, "Apache-2.0 OR MIT", num-iter: 0.1.45, "Apache-2.0 OR MIT", @@ -630,7 +630,7 @@ objc2: 0.6.4, "MIT", objc2-core-foundation: 0.3.2, "Apache-2.0 OR MIT OR Zlib", objc2-encode: 4.1.0, "MIT", objc2-io-kit: 0.3.2, "Apache-2.0 OR MIT OR Zlib", -octocrab: 0.49.6, "Apache-2.0 OR MIT", +octocrab: 0.49.7, "Apache-2.0 OR MIT", oid-registry: 0.8.1, "Apache-2.0 OR MIT", once_cell: 1.21.4, "Apache-2.0 OR MIT", once_cell_polyfill: 1.70.2, "Apache-2.0 OR MIT", @@ -656,7 +656,7 @@ os_pipe: 1.2.3, "MIT", outref: 0.5.2, "MIT", p256: 0.13.2, "Apache-2.0 OR MIT", p384: 0.13.1, "Apache-2.0 OR MIT", -papaya: 0.2.3, "MIT", +papaya: 0.2.4, "MIT", parking: 2.2.1, "Apache-2.0 OR MIT", parking_lot: 0.12.5, "Apache-2.0 OR MIT", parking_lot_core: 0.9.12, "Apache-2.0 OR MIT", @@ -718,7 +718,7 @@ proc-macro2: 1.0.106, "Apache-2.0 OR MIT", proc-macro2-diagnostics: 0.10.1, "Apache-2.0 OR MIT", profiling: 1.0.17, "Apache-2.0 OR MIT", profiling-procmacros: 1.0.17, "Apache-2.0 OR MIT", -prometheus-client: 0.24.0, "Apache-2.0 OR MIT", +prometheus-client: 0.24.1, "Apache-2.0 OR MIT", prometheus-client-derive-encode: 0.5.0, "Apache-2.0 OR MIT", prost: 0.14.3, "Apache-2.0", prost-derive: 0.14.3, "Apache-2.0", @@ -782,8 +782,8 @@ ring: 0.17.14, "Apache-2.0 AND ISC", ringbuffer: 0.16.0, "MIT", rkyv: 0.7.46, "MIT", rkyv_derive: 0.7.46, "MIT", -rmcp: 1.2.0, "Apache-2.0", -rmcp-macros: 1.2.0, "Apache-2.0", +rmcp: 1.3.0, "Apache-2.0", +rmcp-macros: 1.3.0, "Apache-2.0", rmp: 0.8.15, "MIT", rmp-serde: 1.3.1, "MIT", roaring: 0.11.3, "Apache-2.0 OR MIT", @@ -795,8 +795,8 @@ rust-embed: 8.11.0, "MIT", rust-embed-impl: 8.11.0, "MIT", rust-embed-utils: 8.11.0, "MIT", rust-ini: 0.21.3, "MIT", -rust_decimal: 1.40.0, "MIT", -rustc-hash: 2.1.1, "Apache-2.0 OR MIT", +rust_decimal: 1.41.0, "MIT", +rustc-hash: 2.1.2, "Apache-2.0 OR MIT", rustc_version: 0.4.1, "Apache-2.0 OR MIT", rustc_version_runtime: 0.3.0, "MIT", rusticata-macros: 4.1.0, "Apache-2.0 OR MIT", @@ -842,7 +842,7 @@ serde_json: 1.0.149, "Apache-2.0 OR MIT", serde_path_to_error: 0.1.20, "Apache-2.0 OR MIT", serde_repr: 0.1.20, "Apache-2.0 OR MIT", serde_spanned: 0.6.9, "Apache-2.0 OR MIT", -serde_spanned: 1.1.0, "Apache-2.0 OR MIT", +serde_spanned: 1.1.1, "Apache-2.0 OR MIT", serde_urlencoded: 0.7.1, "Apache-2.0 OR MIT", serde_v8: 0.260.0, "MIT", serde_with: 3.18.0, "Apache-2.0 OR MIT", @@ -859,7 +859,7 @@ sharded-slab: 0.1.7, "MIT", shlex: 1.3.0, "Apache-2.0 OR MIT", signal-hook-registry: 1.4.8, "Apache-2.0 OR MIT", signature: 2.2.0, "Apache-2.0 OR MIT", -simd-adler32: 0.3.8, "MIT", +simd-adler32: 0.3.9, "MIT", simd-json: 0.17.0, "Apache-2.0 OR MIT", simd_helpers: 0.1.0, "MIT", simdutf8: 0.1.5, "Apache-2.0 OR MIT", @@ -907,12 +907,12 @@ svgtypes: 0.15.3, "Apache-2.0 OR MIT", syn: 1.0.109, "Apache-2.0 OR MIT", syn: 2.0.117, "Apache-2.0 OR MIT", sync_wrapper: 1.0.2, "Apache-2.0", -synchrony: 0.1.6, "MIT", +synchrony: 0.1.7, "MIT", synstructure: 0.13.2, "MIT", synthez: 0.4.0, "BlueOak-1.0.0", synthez-codegen: 0.4.0, "BlueOak-1.0.0", synthez-core: 0.4.0, "BlueOak-1.0.0", -sys_traits: 0.1.26, "MIT", +sys_traits: 0.1.27, "MIT", sys_traits_macros: 0.1.0, "MIT", sysinfo: 0.37.2, "MIT", sysinfo: 0.38.4, "MIT", @@ -954,15 +954,15 @@ tokio-tungstenite: 0.29.0, "MIT", tokio-util: 0.7.18, "MIT", tokise: 0.2.1, "Apache-2.0 OR MIT", toml: 0.8.23, "Apache-2.0 OR MIT", -toml: 1.1.0+spec-1.1.0, "Apache-2.0 OR MIT", +toml: 1.1.1+spec-1.1.0, "Apache-2.0 OR MIT", toml_datetime: 0.6.11, "Apache-2.0 OR MIT", -toml_datetime: 1.1.0+spec-1.1.0, "Apache-2.0 OR MIT", +toml_datetime: 1.1.1+spec-1.1.0, "Apache-2.0 OR MIT", toml_edit: 0.19.15, "Apache-2.0 OR MIT", toml_edit: 0.22.27, "Apache-2.0 OR MIT", -toml_edit: 0.25.8+spec-1.1.0, "Apache-2.0 OR MIT", -toml_parser: 1.1.0+spec-1.1.0, "Apache-2.0 OR MIT", +toml_edit: 0.25.9+spec-1.1.0, "Apache-2.0 OR MIT", +toml_parser: 1.1.1+spec-1.1.0, "Apache-2.0 OR MIT", toml_write: 0.1.2, "Apache-2.0 OR MIT", -toml_writer: 1.1.0+spec-1.1.0, "Apache-2.0 OR MIT", +toml_writer: 1.1.1+spec-1.1.0, "Apache-2.0 OR MIT", tonic: 0.14.5, "MIT", tonic-prost: 0.14.5, "MIT", tools: 0.1.0, "Apache-2.0", @@ -1007,7 +1007,7 @@ unicode-linebreak: 0.1.5, "Apache-2.0", unicode-normalization: 0.1.25, "Apache-2.0 OR MIT", unicode-properties: 0.1.4, "Apache-2.0 OR MIT", unicode-script: 0.5.8, "Apache-2.0 OR MIT", -unicode-segmentation: 1.13.0, "Apache-2.0 OR MIT", +unicode-segmentation: 1.13.2, "Apache-2.0 OR MIT", unicode-vo: 0.1.0, "Apache-2.0 OR MIT", unicode-width: 0.1.14, "Apache-2.0 OR MIT", unicode-width: 0.2.2, "Apache-2.0 OR MIT", @@ -1025,7 +1025,7 @@ utf8-width: 0.1.8, "MIT", utf8-zero: 0.8.1, "Apache-2.0 OR MIT", utf8_iter: 1.0.4, "Apache-2.0 OR MIT", utf8parse: 0.2.2, "Apache-2.0 OR MIT", -uuid: 1.22.0, "Apache-2.0 OR MIT", +uuid: 1.23.0, "Apache-2.0 OR MIT", v8: 137.3.0, "MIT", v_frame: 0.3.9, "BSD-2-Clause", v_htmlescape: 0.15.8, "Apache-2.0 OR MIT", @@ -1045,11 +1045,11 @@ wasi: 0.11.1+wasi-snapshot-preview1, "Apache-2.0 OR Apache-2.0 WITH LLVM-excepti wasip2: 1.0.2+wasi-0.2.9, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", wasip3: 0.4.0+wasi-0.3.0-rc-2026-01-06, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", wasite: 0.1.0, "Apache-2.0 OR BSL-1.0 OR MIT", -wasm-bindgen: 0.2.114, "Apache-2.0 OR MIT", -wasm-bindgen-futures: 0.4.64, "Apache-2.0 OR MIT", -wasm-bindgen-macro: 0.2.114, "Apache-2.0 OR MIT", -wasm-bindgen-macro-support: 0.2.114, "Apache-2.0 OR MIT", -wasm-bindgen-shared: 0.2.114, "Apache-2.0 OR MIT", +wasm-bindgen: 0.2.117, "Apache-2.0 OR MIT", +wasm-bindgen-futures: 0.4.67, "Apache-2.0 OR MIT", +wasm-bindgen-macro: 0.2.117, "Apache-2.0 OR MIT", +wasm-bindgen-macro-support: 0.2.117, "Apache-2.0 OR MIT", +wasm-bindgen-shared: 0.2.117, "Apache-2.0 OR MIT", wasm-encoder: 0.244.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", wasm-metadata: 0.244.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", wasm-streams: 0.4.2, "Apache-2.0 OR MIT", @@ -1057,7 +1057,7 @@ wasm-streams: 0.5.0, "Apache-2.0 OR MIT", wasm_dep_analyzer: 0.3.0, "MIT", wasmparser: 0.244.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", wasmtimer: 0.4.3, "MIT", -web-sys: 0.3.91, "Apache-2.0 OR MIT", +web-sys: 0.3.94, "Apache-2.0 OR MIT", web-time: 1.1.0, "Apache-2.0 OR MIT", webpki-root-certs: 1.0.6, "CDLA-Permissive-2.0", webpki-roots: 0.26.11, "CDLA-Permissive-2.0", @@ -1133,7 +1133,7 @@ windows_x86_64_msvc: 0.52.6, "Apache-2.0 OR MIT", windows_x86_64_msvc: 0.53.1, "Apache-2.0 OR MIT", winnow: 0.5.40, "MIT", winnow: 0.7.15, "MIT", -winnow: 1.0.0, "MIT", +winnow: 1.0.1, "MIT", winsafe: 0.0.19, "MIT", wit-bindgen: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", wit-bindgen-core: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", @@ -1155,8 +1155,8 @@ yew-router: 0.20.0, "Apache-2.0 OR MIT", yew-router-macro: 0.20.0, "Apache-2.0 OR MIT", yoke: 0.8.1, "Unicode-3.0", yoke-derive: 0.8.1, "Unicode-3.0", -zerocopy: 0.8.47, "Apache-2.0 OR BSD-2-Clause OR MIT", -zerocopy-derive: 0.8.47, "Apache-2.0 OR BSD-2-Clause OR MIT", +zerocopy: 0.8.48, "Apache-2.0 OR BSD-2-Clause OR MIT", +zerocopy-derive: 0.8.48, "Apache-2.0 OR BSD-2-Clause OR MIT", zerofrom: 0.1.6, "Unicode-3.0", zerofrom-derive: 0.1.6, "Unicode-3.0", zeroize: 1.8.2, "Apache-2.0 OR MIT", @@ -1176,4 +1176,4 @@ zune-core: 0.4.12, "Apache-2.0 OR MIT OR Zlib", zune-core: 0.5.1, "Apache-2.0 OR MIT OR Zlib", zune-inflate: 0.2.54, "Apache-2.0 OR MIT OR Zlib", zune-jpeg: 0.4.21, "Apache-2.0 OR MIT OR Zlib", -zune-jpeg: 0.5.14, "Apache-2.0 OR MIT OR Zlib", +zune-jpeg: 0.5.15, "Apache-2.0 OR MIT OR Zlib", diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs index 62476f3ea..a63aad99a 100644 --- a/core/binary_protocol/src/lib.rs +++ b/core/binary_protocol/src/lib.rs @@ -87,6 +87,10 @@ pub use primitives::permissions::{ WireGlobalPermissions, WirePermissions, WireStreamPermissions, WireTopicPermissions, }; pub use primitives::polling_strategy::WirePollingStrategy; +pub use primitives::user_headers::{ + WireHeaderKind, WireUserHeaderEntry, WireUserHeaderIterator, WireUserHeaders, + encode_user_headers, user_headers_encoded_size, validate_user_headers, +}; /// Maximum number of partitions allowed in a single create/delete request. pub const MAX_PARTITIONS_PER_REQUEST: u32 = 1000; diff --git a/core/binary_protocol/src/primitives/mod.rs b/core/binary_protocol/src/primitives/mod.rs index dcbef623e..d1d7ee56a 100644 --- a/core/binary_protocol/src/primitives/mod.rs +++ b/core/binary_protocol/src/primitives/mod.rs @@ -22,3 +22,4 @@ pub mod identifier; pub mod partitioning; pub mod permissions; pub mod polling_strategy; +pub mod user_headers; diff --git a/core/binary_protocol/src/primitives/user_headers.rs b/core/binary_protocol/src/primitives/user_headers.rs new file mode 100644 index 000000000..2f49b39d6 --- /dev/null +++ b/core/binary_protocol/src/primitives/user_headers.rs @@ -0,0 +1,544 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Wire-level primitives for user header TLV validation, iteration, and encoding. +//! +//! User headers are encoded as a sequence of key-value TLV pairs: +//! ```text +//! [key_kind:u8][key_len:u32_le][key_data:N][val_kind:u8][val_len:u32_le][val_data:M] ... +//! ``` +//! +//! This module provides structural validation and zero-copy iteration +//! over these bytes without interpreting semantic meaning of kind codes. +//! Domain-level interpretation (mapping kind codes to typed values) +//! happens in `iggy_common`. + +use std::borrow::Cow; + +use bytes::{BufMut, Bytes, BytesMut}; + +use crate::WireError; +use crate::codec::{WireEncode, read_u8, read_u32_le}; + +/// Maximum byte length for a single header key or value on the wire. +const MAX_FIELD_LENGTH: u32 = 255; + +/// Opaque header kind tag as transmitted on the wire. +/// +/// This is a `u8` newtype - NOT an exhaustive enum. New kind codes can be +/// added without breaking structural validation, which is critical for +/// VSR rolling upgrades where older nodes must forward messages containing +/// kind codes they do not yet understand. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct WireHeaderKind(pub u8); + +impl WireHeaderKind { + /// Highest currently defined kind code (Float64 = 15). + pub const KNOWN_MAX: u8 = 15; + + /// Returns `true` if the kind code is within the currently defined range. + #[must_use] + pub fn is_known(self) -> bool { + (1..=Self::KNOWN_MAX).contains(&self.0) + } +} + +/// A single key-value header entry borrowing the underlying buffer. +#[derive(Debug)] +pub struct WireUserHeaderEntry<'a> { + pub key_kind: WireHeaderKind, + pub key: &'a [u8], + pub value_kind: WireHeaderKind, + pub value: &'a [u8], +} + +/// Validate the structural integrity of a user headers byte buffer. +/// +/// Walks TLV pairs and checks: +/// - Every kind byte is non-zero +/// - Every length is in `1..=255` +/// - No buffer overrun +/// - Total consumed bytes equals `buf.len()` (no trailing garbage) +/// - Even number of TLV entries (each pair = key + value) +/// +/// Returns the number of complete key-value pairs on success. +/// +/// # Errors +/// +/// Returns `WireError::UnexpectedEof` if the buffer is truncated mid-entry, +/// or `WireError::Validation` if structural constraints are violated. +pub fn validate_user_headers(buf: &[u8]) -> Result<u32, WireError> { + if buf.is_empty() { + return Ok(0); + } + + let mut pos = 0; + let mut tlv_count: u32 = 0; + + while pos < buf.len() { + let kind = read_u8(buf, pos)?; + if kind == 0 { + return Err(WireError::Validation(Cow::Owned(format!( + "header kind is 0 (reserved) at offset {pos}" + )))); + } + pos += 1; + + let length = read_u32_le(buf, pos)?; + pos += 4; + + if length == 0 || length > MAX_FIELD_LENGTH { + return Err(WireError::Validation(Cow::Owned(format!( + "header field length {length} out of range 1..={MAX_FIELD_LENGTH} at offset {}", + pos - 4 + )))); + } + + let data_end = pos + .checked_add(length as usize) + .ok_or(WireError::Validation(Cow::Borrowed( + "header field length overflow", + )))?; + if data_end > buf.len() { + return Err(WireError::UnexpectedEof { + offset: pos, + need: length as usize, + have: buf.len() - pos, + }); + } + pos = data_end; + + tlv_count = tlv_count + .checked_add(1) + .ok_or(WireError::Validation(Cow::Borrowed( + "header entry count overflow", + )))?; + } + + if !tlv_count.is_multiple_of(2) { + return Err(WireError::Validation(Cow::Owned(format!( + "odd number of TLV entries ({tlv_count}), expected key-value pairs" + )))); + } + + Ok(tlv_count / 2) +} + +/// Zero-copy iterator over pre-validated user header bytes. +/// +/// Yields one [`WireUserHeaderEntry`] per key-value pair. The buffer +/// **must** have been validated by [`validate_user_headers`] first; +/// iterating an invalid buffer may yield garbage or panic. +pub struct WireUserHeaderIterator<'a> { + buf: &'a [u8], + pos: usize, +} + +impl<'a> WireUserHeaderIterator<'a> { + /// Create an iterator over a pre-validated user headers buffer. + #[must_use] + pub const fn new(buf: &'a [u8]) -> Self { + Self { buf, pos: 0 } + } +} + +impl<'a> Iterator for WireUserHeaderIterator<'a> { + type Item = WireUserHeaderEntry<'a>; + + fn next(&mut self) -> Option<Self::Item> { + if self.pos >= self.buf.len() { + return None; + } + + // Key TLV + let key_kind = self.buf[self.pos]; + self.pos += 1; + + let key_len = u32::from_le_bytes( + self.buf[self.pos..self.pos + 4] + .try_into() + .expect("pre-validated buffer"), + ) as usize; + self.pos += 4; + + let key = &self.buf[self.pos..self.pos + key_len]; + self.pos += key_len; + + // Value TLV + let value_kind = self.buf[self.pos]; + self.pos += 1; + + let value_len = u32::from_le_bytes( + self.buf[self.pos..self.pos + 4] + .try_into() + .expect("pre-validated buffer"), + ) as usize; + self.pos += 4; + + let value = &self.buf[self.pos..self.pos + value_len]; + self.pos += value_len; + + Some(WireUserHeaderEntry { + key_kind: WireHeaderKind(key_kind), + key, + value_kind: WireHeaderKind(value_kind), + value, + }) + } +} + +/// Calculate the exact encoded size for a set of header entries. +/// +/// Each entry tuple is `(key_kind, key_data, value_kind, value_data)`. +#[must_use] +pub fn user_headers_encoded_size(entries: &[(u8, &[u8], u8, &[u8])]) -> usize { + entries + .iter() + .map(|(_, k, _, v)| 1 + 4 + k.len() + 1 + 4 + v.len()) + .sum() +} + +/// Encode header entries into a caller-owned buffer. +/// +/// Each entry tuple is `(key_kind, key_data, value_kind, value_data)`. +/// The caller should pre-allocate `buf` with [`user_headers_encoded_size`]. +pub fn encode_user_headers(entries: &[(u8, &[u8], u8, &[u8])], buf: &mut BytesMut) { + for &(key_kind, key_data, value_kind, value_data) in entries { + buf.put_u8(key_kind); + #[allow(clippy::cast_possible_truncation)] + buf.put_u32_le(key_data.len() as u32); + buf.put_slice(key_data); + buf.put_u8(value_kind); + #[allow(clippy::cast_possible_truncation)] + buf.put_u32_le(value_data.len() as u32); + buf.put_slice(value_data); + } +} + +/// Pre-validated user headers as a contiguous TLV byte buffer. +/// +/// Construction validates structural integrity (no partial TLVs, no zero kinds, +/// no oversized fields, even entry count). The inner bytes are immutable. +/// +/// This type is intentionally opaque at the wire layer. Domain-level +/// interpretation of kind codes happens in `iggy_common` via the conversion +/// bridge in `wire_conversions.rs`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct WireUserHeaders(Bytes); + +impl WireUserHeaders { + /// Validate and wrap a borrowed byte slice (copies into new `Bytes`). + /// + /// # Errors + /// Returns `WireError` if the buffer is not structurally valid TLV. + pub fn from_slice(buf: &[u8]) -> Result<Self, WireError> { + validate_user_headers(buf)?; + Ok(Self(Bytes::copy_from_slice(buf))) + } + + /// Validate and wrap an owned `Bytes` buffer (zero-copy). + /// + /// # Errors + /// Returns `WireError` if the buffer is not structurally valid TLV. + pub fn from_bytes(buf: Bytes) -> Result<Self, WireError> { + validate_user_headers(&buf)?; + Ok(Self(buf)) + } + + /// Wrap pre-validated bytes without re-checking. + /// + /// # Safety contract (not `unsafe`, but caller must ensure): + /// The bytes must be structurally valid TLV as defined by + /// [`validate_user_headers`]. Iterating invalid bytes will panic. + pub const fn from_validated(buf: Bytes) -> Self { + Self(buf) + } + + /// Empty headers (zero-length buffer). + #[must_use] + pub const fn empty() -> Self { + Self(Bytes::new()) + } + + #[must_use] + pub const fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// The raw validated bytes, suitable for wire transmission. + #[must_use] + pub fn as_bytes(&self) -> &[u8] { + &self.0 + } + + /// Consume into the inner `Bytes` handle (zero-copy). + #[must_use] + pub fn into_bytes(self) -> Bytes { + self.0 + } + + /// Zero-copy iteration over the pre-validated entries. + #[must_use] + pub fn iter(&self) -> WireUserHeaderIterator<'_> { + WireUserHeaderIterator::new(&self.0) + } +} + +impl<'a> IntoIterator for &'a WireUserHeaders { + type Item = WireUserHeaderEntry<'a>; + type IntoIter = WireUserHeaderIterator<'a>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl WireEncode for WireUserHeaders { + fn encoded_size(&self) -> usize { + self.0.len() + } + + fn encode(&self, buf: &mut BytesMut) { + buf.put_slice(&self.0); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn encode_test_entries(entries: &[(u8, &[u8], u8, &[u8])]) -> Vec<u8> { + let size = user_headers_encoded_size(entries); + let mut buf = BytesMut::with_capacity(size); + encode_user_headers(entries, &mut buf); + buf.to_vec() + } + + #[test] + fn empty_buffer_is_valid() { + assert_eq!(validate_user_headers(&[]).unwrap(), 0); + } + + #[test] + fn single_pair_round_trip() { + let entries = [(6u8, b"test" as &[u8], 2u8, b"val" as &[u8])]; + let encoded = encode_test_entries(&entries); + + assert_eq!(validate_user_headers(&encoded).unwrap(), 1); + + let mut iter = WireUserHeaderIterator::new(&encoded); + let entry = iter.next().unwrap(); + assert_eq!(entry.key_kind, WireHeaderKind(6)); + assert_eq!(entry.key, b"test"); + assert_eq!(entry.value_kind, WireHeaderKind(2)); + assert_eq!(entry.value, b"val"); + assert!(iter.next().is_none()); + } + + #[test] + fn multiple_pairs() { + let entries = [ + (1u8, b"k1" as &[u8], 2u8, b"v1" as &[u8]), + ( + 6u8, + &42i32.to_le_bytes() as &[u8], + 12u8, + &99u64.to_le_bytes() as &[u8], + ), + (3u8, &[1u8] as &[u8], 9u8, &[255u8] as &[u8]), + ]; + let encoded = encode_test_entries(&entries); + + assert_eq!(validate_user_headers(&encoded).unwrap(), 3); + + let items: Vec<_> = WireUserHeaderIterator::new(&encoded).collect(); + assert_eq!(items.len(), 3); + assert_eq!(items[0].key, b"k1"); + assert_eq!(items[1].key, &42i32.to_le_bytes()); + assert_eq!(items[2].value, &[255u8]); + } + + #[test] + fn unknown_kind_codes_pass_validation() { + let entries = [(16u8, b"a" as &[u8], 255u8, b"b" as &[u8])]; + let encoded = encode_test_entries(&entries); + assert_eq!(validate_user_headers(&encoded).unwrap(), 1); + + let entry = WireUserHeaderIterator::new(&encoded).next().unwrap(); + assert_eq!(entry.key_kind, WireHeaderKind(16)); + assert!(!entry.key_kind.is_known()); + assert_eq!(entry.value_kind, WireHeaderKind(255)); + assert!(!entry.value_kind.is_known()); + } + + #[test] + fn known_kind_codes() { + for code in 1..=15u8 { + assert!(WireHeaderKind(code).is_known()); + } + assert!(!WireHeaderKind(0).is_known()); + assert!(!WireHeaderKind(16).is_known()); + } + + #[test] + fn truncated_at_kind_byte() { + let entries = [(1u8, b"k" as &[u8], 2u8, b"v" as &[u8])]; + let encoded = encode_test_entries(&entries); + // Truncate to just the first kind byte and length, missing data + let result = validate_user_headers(&encoded[..3]); + assert!(result.is_err()); + } + + #[test] + fn truncated_at_length_field() { + let entries = [(1u8, b"k" as &[u8], 2u8, b"v" as &[u8])]; + let encoded = encode_test_entries(&entries); + // Only kind byte, partial length + let result = validate_user_headers(&encoded[..2]); + assert!(result.is_err()); + } + + #[test] + fn truncated_at_data() { + let entries = [(1u8, b"longkey" as &[u8], 2u8, b"v" as &[u8])]; + let encoded = encode_test_entries(&entries); + // Kind + length present, but data truncated + let result = validate_user_headers(&encoded[..7]); + assert!(result.is_err()); + } + + #[test] + fn zero_length_rejected() { + // kind=1, length=0 + let buf = [1u8, 0, 0, 0, 0]; + let result = validate_user_headers(&buf); + assert!(matches!(result, Err(WireError::Validation(_)))); + } + + #[test] + fn length_exceeds_max_rejected() { + // kind=1, length=256 + let mut buf = vec![1u8]; + buf.extend_from_slice(&256u32.to_le_bytes()); + buf.extend_from_slice(&[0u8; 256]); + let result = validate_user_headers(&buf); + assert!(matches!(result, Err(WireError::Validation(_)))); + } + + #[test] + fn kind_zero_rejected() { + // kind=0 is reserved + let mut buf = vec![0u8]; + buf.extend_from_slice(&1u32.to_le_bytes()); + buf.push(42); + let result = validate_user_headers(&buf); + assert!(matches!(result, Err(WireError::Validation(_)))); + } + + #[test] + fn trailing_bytes_rejected() { + let entries = [(1u8, b"k" as &[u8], 2u8, b"v" as &[u8])]; + let mut encoded = encode_test_entries(&entries); + encoded.push(0xFF); // trailing garbage + let result = validate_user_headers(&encoded); + assert!(result.is_err()); + } + + #[test] + fn odd_tlv_count_rejected() { + // One TLV entry (key without value) - odd count + let mut buf = vec![1u8]; + buf.extend_from_slice(&2u32.to_le_bytes()); + buf.extend_from_slice(b"ab"); + let result = validate_user_headers(&buf); + assert!(matches!(result, Err(WireError::Validation(_)))); + } + + #[test] + fn encoded_size_matches_actual() { + let entries = [ + (1u8, b"key1" as &[u8], 2u8, b"value1" as &[u8]), + ( + 6u8, + &100i32.to_le_bytes() as &[u8], + 14u8, + &std::f32::consts::PI.to_le_bytes() as &[u8], + ), + ]; + let expected_size = user_headers_encoded_size(&entries); + let encoded = encode_test_entries(&entries); + assert_eq!(expected_size, encoded.len()); + } + + #[test] + fn max_length_field_accepted() { + let data = [0xAA; 255]; + let entries = [(1u8, &data[..], 1u8, &data[..])]; + let encoded = encode_test_entries(&entries); + assert_eq!(validate_user_headers(&encoded).unwrap(), 1); + } + + #[test] + fn wire_user_headers_empty() { + let wire = WireUserHeaders::empty(); + assert!(wire.is_empty()); + assert_eq!(wire.as_bytes(), &[]); + assert_eq!(wire.encoded_size(), 0); + } + + #[test] + fn wire_user_headers_from_slice_round_trip() { + let entries = [(6u8, b"test" as &[u8], 2u8, b"val" as &[u8])]; + let encoded = encode_test_entries(&entries); + + let wire = WireUserHeaders::from_slice(&encoded).unwrap(); + assert!(!wire.is_empty()); + assert_eq!(wire.as_bytes(), &encoded); + assert_eq!(wire.encoded_size(), encoded.len()); + + let entry = wire.iter().next().unwrap(); + assert_eq!(entry.key_kind, WireHeaderKind(6)); + assert_eq!(entry.key, b"test"); + } + + #[test] + fn wire_user_headers_from_bytes_zero_copy() { + let entries = [(1u8, b"k" as &[u8], 2u8, b"v" as &[u8])]; + let encoded = Bytes::from(encode_test_entries(&entries)); + + let wire = WireUserHeaders::from_bytes(encoded.clone()).unwrap(); + assert_eq!(wire.as_bytes(), &encoded[..]); + assert_eq!(wire.into_bytes(), encoded); + } + + #[test] + fn wire_user_headers_rejects_invalid() { + let buf = [0u8, 1, 0, 0, 0, 42]; // kind=0 is invalid + assert!(WireUserHeaders::from_slice(&buf).is_err()); + } + + #[test] + fn wire_user_headers_encode() { + let entries = [(1u8, b"k" as &[u8], 2u8, b"v" as &[u8])]; + let encoded = encode_test_entries(&entries); + let wire = WireUserHeaders::from_slice(&encoded).unwrap(); + + let mut buf = BytesMut::with_capacity(wire.encoded_size()); + wire.encode(&mut buf); + assert_eq!(&buf[..], &encoded); + } +} diff --git a/core/cli/Cargo.toml b/core/cli/Cargo.toml index 3d2044066..5fabc500d 100644 --- a/core/cli/Cargo.toml +++ b/core/cli/Cargo.toml @@ -54,6 +54,7 @@ comfy-table = { workspace = true } dirs = { workspace = true } figlet-rs = { workspace = true } iggy = { workspace = true } +iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } keyring = { workspace = true, optional = true } passterm = { workspace = true } diff --git a/core/cli/src/commands/binary_message/poll_messages.rs b/core/cli/src/commands/binary_message/poll_messages.rs index 1b0a18422..3274bb943 100644 --- a/core/cli/src/commands/binary_message/poll_messages.rs +++ b/core/cli/src/commands/binary_message/poll_messages.rs @@ -20,10 +20,12 @@ use crate::commands::cli_command::{CliCommand, PRINT_TARGET}; use anyhow::Context; use async_trait::async_trait; use comfy_table::{Cell, CellAlignment, Row, Table}; +use iggy_binary_protocol::WireUserHeaders; use iggy_common::Client; use iggy_common::{ Consumer, HeaderKey, HeaderKind, Identifier, IggyByteSize, IggyDuration, IggyMessage, - IggyTimestamp, PollMessages, PollingStrategy, Sizeable, user_headers_from_bytes, + IggyTimestamp, PollMessages, PollingStrategy, Sizeable, + wire_conversions::user_headers_from_wire, }; use std::collections::HashSet; use tokio::io::AsyncWriteExt; @@ -85,7 +87,10 @@ impl PollMessagesCmd { .iter() .flat_map(|m| { if let Some(user_headers) = &m.user_headers { - match user_headers_from_bytes(user_headers.clone()) { + match WireUserHeaders::from_bytes(user_headers.clone()) + .map_err(|_| iggy_common::IggyError::InvalidHeaderKey) + .and_then(|w| user_headers_from_wire(&w)) + { Ok(headers) => headers .iter() .map(|(k, v)| (k.clone(), v.kind())) diff --git a/core/cli/src/commands/binary_message/send_messages.rs b/core/cli/src/commands/binary_message/send_messages.rs index ebbd13c5d..99d40ebda 100644 --- a/core/cli/src/commands/binary_message/send_messages.rs +++ b/core/cli/src/commands/binary_message/send_messages.rs @@ -22,7 +22,7 @@ use async_trait::async_trait; use bytes::Bytes; use iggy_common::Client; use iggy_common::{HeaderKey, HeaderValue, Identifier, IggyMessage, Partitioning, Sizeable}; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::io::{self, Read}; use tokio::io::AsyncReadExt; use tracing::{Level, event}; @@ -73,7 +73,7 @@ impl SendMessagesCmd { Ok(buffer) } - fn get_headers(&self) -> Option<HashMap<HeaderKey, HeaderValue>> { + fn get_headers(&self) -> Option<BTreeMap<HeaderKey, HeaderValue>> { match self.headers.len() { 0 => None, _ => Some(self.headers.iter().cloned().collect()), diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index dca0f75c2..fca1ed1e8 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -388,6 +388,8 @@ pub enum IggyError { InvalidMessageTimestampDelta(u64) = 4038, #[error("Invalid batch checksum: {0}, expected: {1}, for base offset: {2}")] InvalidBatchChecksum(u64, u64, u64) = 4039, + #[error("Invalid header kind code: {0}")] + InvalidHeaderKind(u8) = 4040, #[error("Cannot sed messages due to client disconnection")] CannotSendMessagesDueToClientDisconnection = 4050, #[error("Background send error")] diff --git a/core/common/src/http/messages/send_messages.rs b/core/common/src/http/messages/send_messages.rs index d01049dae..445005f64 100644 --- a/core/common/src/http/messages/send_messages.rs +++ b/core/common/src/http/messages/send_messages.rs @@ -29,7 +29,7 @@ use bytes::Bytes; use serde::de::{self, MapAccess, Visitor}; use serde::ser::SerializeStruct; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::fmt::Formatter; /// `SendMessages` command is used to send messages to a topic in a stream. @@ -233,7 +233,7 @@ impl<'de> Deserialize<'de> for SendMessages { "Invalid headers format: {e}" )) })?; - let mut map = HashMap::new(); + let mut map = BTreeMap::new(); for entry in entries { map.insert(entry.key, entry.value); } diff --git a/core/common/src/types/message/iggy_message.rs b/core/common/src/types/message/iggy_message.rs index 914fe27dd..1eabac333 100644 --- a/core/common/src/types/message/iggy_message.rs +++ b/core/common/src/types/message/iggy_message.rs @@ -17,19 +17,19 @@ */ use super::message_header::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader}; -use super::user_headers::{get_user_headers_size, user_headers_from_bytes, user_headers_to_bytes}; use crate::Sizeable; use crate::error::IggyError; use crate::utils::byte_size::IggyByteSize; use crate::utils::timestamp::IggyTimestamp; +use crate::wire_conversions::{user_headers_from_wire, user_headers_to_wire}; use crate::{HeaderKey, HeaderValue}; use bon::bon; use bytes::{BufMut, Bytes, BytesMut}; +use iggy_binary_protocol::WireUserHeaders; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::convert::TryFrom; use std::str::FromStr; -use tracing::warn; /// Maximum allowed size in bytes for a message payload. /// @@ -71,7 +71,7 @@ pub const MAX_USER_HEADERS_SIZE: u32 = 100 * 1000; /// ``` /// use iggy_common::*; /// use std::str::FromStr; -/// use std::collections::HashMap; +/// use std::collections::BTreeMap; /// use bytes::Bytes; /// /// // Create a simple text message @@ -96,7 +96,7 @@ pub const MAX_USER_HEADERS_SIZE: u32 = 100 * 1000; /// // Create a message with headers /// let key = HeaderKey::from_str("content-type").unwrap(); /// let value = HeaderValue::from_str("text/plain").unwrap(); -/// let user_headers = HashMap::from([(key, value)]); +/// let user_headers = BTreeMap::from([(key, value)]); /// /// let message = IggyMessage::builder() /// .payload("Message with metadata".into()) @@ -135,7 +135,7 @@ impl IggyMessage { /// ``` /// use iggy_common::*; /// use bytes::Bytes; - /// use std::collections::HashMap; + /// use std::collections::BTreeMap; /// use std::str::FromStr; /// /// // Simple message with just payload @@ -154,7 +154,7 @@ impl IggyMessage { /// // Message with headers /// let key = HeaderKey::from_str("content-type").unwrap(); /// let value = HeaderValue::from_str("text/plain").unwrap(); - /// let user_headers = HashMap::from([(key, value)]); + /// let user_headers = BTreeMap::from([(key, value)]); /// let msg = IggyMessage::builder() /// .payload("Hello".into()) /// .user_headers(user_headers) @@ -165,7 +165,7 @@ impl IggyMessage { pub fn new( id: Option<u128>, payload: Bytes, - user_headers: Option<HashMap<HeaderKey, HeaderValue>>, + user_headers: Option<BTreeMap<HeaderKey, HeaderValue>>, ) -> Result<Self, IggyError> { if payload.is_empty() { return Err(IggyError::InvalidMessagePayloadLength); @@ -175,7 +175,10 @@ impl IggyMessage { return Err(IggyError::TooBigMessagePayload); } - let user_headers_length = get_user_headers_size(&user_headers).unwrap_or(0); + let wire_headers = user_headers.as_ref().map(user_headers_to_wire); + let user_headers_length = wire_headers + .as_ref() + .map_or(0, |w| w.as_bytes().len() as u32); if user_headers_length > MAX_USER_HEADERS_SIZE { return Err(IggyError::TooBigUserHeaders); @@ -192,7 +195,7 @@ impl IggyMessage { reserved: 0, }; - let user_headers = user_headers.map(|h| user_headers_to_bytes(&h)); + let user_headers = wire_headers.map(|w| w.into_bytes()); Ok(Self { header, @@ -203,13 +206,13 @@ impl IggyMessage { } impl IggyMessage { - /// Gets the user headers as a typed HashMap. + /// Gets the user headers as a typed BTreeMap. /// - /// This method parses the binary header data into a typed HashMap for easy access. + /// This method parses the binary header data into a typed BTreeMap for easy access. /// /// # Returns /// - /// * `Ok(Some(HashMap))` - Successfully parsed headers + /// * `Ok(Some(BTreeMap))` - Successfully parsed headers /// * `Ok(None)` - No headers present /// * `Err(IggyError)` - Error parsing headers /// @@ -218,11 +221,11 @@ impl IggyMessage { /// ``` /// use iggy_common::*; /// use std::str::FromStr; - /// use std::collections::HashMap; + /// use std::collections::BTreeMap; /// /// let key = HeaderKey::from_str("content-type").unwrap(); /// let value = HeaderValue::from_str("text/plain").unwrap(); - /// let user_headers_map = HashMap::from([(key.clone(), value)]); + /// let user_headers_map = BTreeMap::from([(key.clone(), value)]); /// /// let message = IggyMessage::builder() /// .payload("Hello".into()) @@ -233,21 +236,22 @@ impl IggyMessage { /// let headers = message.user_headers_map().unwrap().unwrap(); /// assert!(headers.contains_key(&key)); /// ``` - pub fn user_headers_map(&self) -> Result<Option<HashMap<HeaderKey, HeaderValue>>, IggyError> { + pub fn user_headers_map(&self) -> Result<Option<BTreeMap<HeaderKey, HeaderValue>>, IggyError> { if let Some(user_headers) = &self.user_headers { - match user_headers_from_bytes(user_headers.clone()) { - Ok(h) => Ok(Some(h)), + let wire = match WireUserHeaders::from_bytes(user_headers.clone()) { + Ok(w) => w, Err(e) => { - warn!( - "Failed to deserialize user headers: {e} for message at offset {}, sent at: {} ({}), user_headers_length: {}, skipping field...", + tracing::warn!( + "Failed to parse user headers for message at offset {}, \ + user_headers_length: {}: {e}", self.header.offset, - IggyTimestamp::from(self.header.origin_timestamp).to_rfc3339_string(), - self.header.origin_timestamp, self.header.user_headers_length ); - Ok(None) + return Ok(None); } - } + }; + let map = user_headers_from_wire(&wire)?; + Ok(Some(map)) } else { Ok(None) } @@ -264,7 +268,7 @@ impl IggyMessage { /// # Returns /// /// * `Ok(Some(HeaderValue))` - User header found with its value - /// * `Ok(None)` - User header not found or user headers couldn't be parsed + /// * `Ok(None)` - User header not found or no user headers present /// * `Err(IggyError)` - Error accessing user headers /// /// # Examples @@ -272,11 +276,11 @@ impl IggyMessage { /// ``` /// use iggy_common::*; /// use std::str::FromStr; - /// use std::collections::HashMap; + /// use std::collections::BTreeMap; /// /// let key = HeaderKey::from_str("content-type").unwrap(); /// let value = HeaderValue::from_str("text/plain").unwrap(); - /// let user_headers_map = HashMap::from([(key.clone(), value.clone())]); + /// let user_headers_map = BTreeMap::from([(key.clone(), value.clone())]); /// /// let message = IggyMessage::builder() /// .payload("Hello".into()) @@ -310,11 +314,11 @@ impl IggyMessage { /// ``` /// use iggy_common::*; /// use std::str::FromStr; - /// use std::collections::HashMap; + /// use std::collections::BTreeMap; /// /// let key = HeaderKey::from_str("content-type").unwrap(); /// let value = HeaderValue::from_str("text/plain").unwrap(); - /// let user_headers_map = HashMap::from([(key.clone(), value)]); + /// let user_headers_map = BTreeMap::from([(key.clone(), value)]); /// /// let message = IggyMessage::builder() /// .payload("Hello".into()) @@ -597,7 +601,7 @@ impl<'de> Deserialize<'de> for IggyMessage { { let mut header: Option<IggyMessageHeader> = None; let mut payload: Option<Bytes> = None; - let mut user_headers: Option<HashMap<HeaderKey, HeaderValue>> = None; + let mut user_headers: Option<BTreeMap<HeaderKey, HeaderValue>> = None; let mut raw_user_headers: Option<Bytes> = None; while let Some(key) = map.next_key::<String>()? { @@ -630,7 +634,7 @@ impl<'de> Deserialize<'de> for IggyMessage { .map_err(|e| { de::Error::custom(format!("Invalid headers format: {e}")) })?; - let mut headers_map = HashMap::new(); + let mut headers_map = BTreeMap::new(); for entry in entries { headers_map.insert(entry.key, entry.value); } @@ -649,7 +653,7 @@ impl<'de> Deserialize<'de> for IggyMessage { let user_headers_bytes = if let Some(raw) = raw_user_headers { Some(raw) } else { - user_headers.map(|headers| user_headers_to_bytes(&headers)) + user_headers.map(|headers| user_headers_to_wire(&headers).into_bytes()) }; let user_headers_length = user_headers_bytes @@ -704,7 +708,7 @@ mod tests { #[test] fn test_create_with_headers() { - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("content-type").unwrap(), HeaderValue::try_from("text/plain").unwrap(), @@ -782,7 +786,7 @@ mod tests { #[test] fn test_json_serialization_with_headers() { - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("content-type").unwrap(), HeaderValue::try_from("text/plain").unwrap(), diff --git a/core/common/src/types/message/message_view.rs b/core/common/src/types/message/message_view.rs index c3db53154..0706b047d 100644 --- a/core/common/src/types/message/message_view.rs +++ b/core/common/src/types/message/message_view.rs @@ -19,14 +19,14 @@ use super::HeaderValue; use super::message_boundaries::IggyMessageBoundaries; use super::message_header::*; -use super::user_headers::user_headers_from_bytes; use crate::IggyByteSize; use crate::Sizeable; use crate::error::IggyError; use crate::utils::checksum; +use crate::wire_conversions::user_headers_from_wire; use crate::{HeaderKey, IggyMessageHeaderView}; -use bytes::Bytes; -use std::collections::HashMap; +use iggy_binary_protocol::WireUserHeaders; +use std::collections::BTreeMap; use std::num::NonZeroUsize; /// A immutable view of a message. @@ -86,22 +86,20 @@ impl<'a> IggyMessageView<'a> { } /// Return instantiated user headers map - pub fn user_headers_map(&self) -> Result<Option<HashMap<HeaderKey, HeaderValue>>, IggyError> { + pub fn user_headers_map(&self) -> Result<Option<BTreeMap<HeaderKey, HeaderValue>>, IggyError> { if let Some(headers) = self.user_headers() { - let headers_bytes = Bytes::copy_from_slice(headers); - - match user_headers_from_bytes(headers_bytes) { - Ok(h) => Ok(Some(h)), + let wire = match WireUserHeaders::from_slice(headers) { + Ok(w) => w, Err(e) => { - tracing::error!( - "Error parsing headers: {}, header_length={}", - e, + tracing::warn!( + "Failed to parse user headers: {e}, header_length={}", self.header().user_headers_length() ); - - Ok(None) + return Ok(None); } - } + }; + let map = user_headers_from_wire(&wire)?; + Ok(Some(map)) } else { Ok(None) } diff --git a/core/common/src/types/message/mod.rs b/core/common/src/types/message/mod.rs index bd5688926..281cc3273 100644 --- a/core/common/src/types/message/mod.rs +++ b/core/common/src/types/message/mod.rs @@ -71,6 +71,5 @@ pub use polling_kind::PollingKind; pub use polling_strategy::PollingStrategy; pub use user_headers::{ HeaderEntry, HeaderField, HeaderKey, HeaderKind, HeaderValue, KeyMarker, UserHeaders, - ValueMarker, deserialize_headers, serialize_headers, user_headers_from_bytes, - user_headers_to_bytes, + ValueMarker, deserialize_headers, serialize_headers, }; diff --git a/core/common/src/types/message/user_headers.rs b/core/common/src/types/message/user_headers.rs index 396608199..72028464f 100644 --- a/core/common/src/types/message/user_headers.rs +++ b/core/common/src/types/message/user_headers.rs @@ -17,11 +17,11 @@ */ use crate::error::IggyError; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use serde_with::base64::Base64; use serde_with::serde_as; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; use std::marker::PhantomData; @@ -84,7 +84,7 @@ pub type HeaderKey = HeaderField<KeyMarker>; pub type HeaderValue = HeaderField<ValueMarker>; /// Type alias for a collection of user-defined message headers. -pub type UserHeaders = HashMap<HeaderKey, HeaderValue>; +pub type UserHeaders = BTreeMap<HeaderKey, HeaderValue>; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct KeyMarker; @@ -266,11 +266,11 @@ impl HeaderKind { 13 => Ok(HeaderKind::Uint128), 14 => Ok(HeaderKind::Float32), 15 => Ok(HeaderKind::Float64), - _ => Err(IggyError::InvalidCommand), + _ => Err(IggyError::InvalidHeaderKind(code)), } } - fn expected_size(&self) -> Option<usize> { + pub(crate) fn expected_size(&self) -> Option<usize> { match self { HeaderKind::Raw | HeaderKind::String => None, HeaderKind::Bool | HeaderKind::Int8 | HeaderKind::Uint8 => Some(1), @@ -282,6 +282,18 @@ impl HeaderKind { } } +impl Ord for HeaderKind { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.as_code().cmp(&other.as_code()) + } +} + +impl PartialOrd for HeaderKind { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + impl FromStr for HeaderKind { type Err = IggyError; fn from_str(s: &str) -> Result<Self, Self::Err> { @@ -598,15 +610,27 @@ impl<T> HeaderField<T> { } } - fn new_unchecked(kind: HeaderKind, value: &[u8]) -> Self { + pub(crate) fn new_unchecked(kind: HeaderKind, value: &[u8]) -> Self { Self { kind, - value: Bytes::from(value.to_vec()), + value: Bytes::copy_from_slice(value), _marker: PhantomData, } } } +impl<T: Eq> Ord for HeaderField<T> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (self.kind.as_code(), &self.value).cmp(&(other.kind.as_code(), &other.value)) + } +} + +impl<T: Eq> PartialOrd for HeaderField<T> { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + impl<T> TryFrom<&str> for HeaderField<T> { type Error = IggyError; fn try_from(value: &str) -> Result<Self, Self::Error> { @@ -929,110 +953,6 @@ impl<T> TryFrom<&HeaderField<T>> for Vec<u8> { } } -pub fn user_headers_to_bytes(headers: &HashMap<HeaderKey, HeaderValue>) -> Bytes { - if headers.is_empty() { - return Bytes::new(); - } - - let mut bytes = BytesMut::new(); - for (key, value) in headers { - bytes.put_u8(key.kind().as_code()); - #[allow(clippy::cast_possible_truncation)] - bytes.put_u32_le(key.as_bytes().len() as u32); - bytes.put_slice(key.as_bytes()); - bytes.put_u8(value.kind().as_code()); - #[allow(clippy::cast_possible_truncation)] - bytes.put_u32_le(value.as_bytes().len() as u32); - bytes.put_slice(value.as_bytes()); - } - - bytes.freeze() -} - -pub fn user_headers_from_bytes(bytes: Bytes) -> Result<HashMap<HeaderKey, HeaderValue>, IggyError> { - if bytes.is_empty() { - return Ok(HashMap::new()); - } - - let mut headers = HashMap::new(); - let mut position = 0; - while position < bytes.len() { - let key_kind = HeaderKind::from_code(bytes[position])?; - position += 1; - - if position + 4 > bytes.len() { - return Err(IggyError::InvalidHeaderKey); - } - let key_length = u32::from_le_bytes( - bytes[position..position + 4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ) as usize; - if key_length == 0 || key_length > 255 { - return Err(IggyError::InvalidHeaderKey); - } - position += 4; - - if position + key_length > bytes.len() { - return Err(IggyError::InvalidHeaderKey); - } - if let Some(expected) = key_kind.expected_size() - && key_length != expected - { - return Err(IggyError::InvalidHeaderKey); - } - let key_value = bytes[position..position + key_length].to_vec(); - position += key_length; - - if position >= bytes.len() { - return Err(IggyError::InvalidHeaderValue); - } - let value_kind = HeaderKind::from_code(bytes[position])?; - position += 1; - - if position + 4 > bytes.len() { - return Err(IggyError::InvalidHeaderValue); - } - let value_length = u32::from_le_bytes( - bytes[position..position + 4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ) as usize; - if value_length == 0 || value_length > 255 { - return Err(IggyError::InvalidHeaderValue); - } - position += 4; - - if position + value_length > bytes.len() { - return Err(IggyError::InvalidHeaderValue); - } - if let Some(expected) = value_kind.expected_size() - && value_length != expected - { - return Err(IggyError::InvalidHeaderValue); - } - let value_value = bytes[position..position + value_length].to_vec(); - position += value_length; - - headers.insert( - HeaderKey::new_unchecked(key_kind, &key_value), - HeaderValue::new_unchecked(value_kind, &value_value), - ); - } - - Ok(headers) -} - -pub fn get_user_headers_size(headers: &Option<HashMap<HeaderKey, HeaderValue>>) -> Option<u32> { - let mut size = 0; - if let Some(headers) = headers { - for (key, value) in headers { - size += 1 + 4 + key.as_bytes().len() as u32 + 1 + 4 + value.as_bytes().len() as u32; - } - } - Some(size) -} - #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct HeaderEntry { pub key: HeaderKey, @@ -1080,6 +1000,9 @@ where #[cfg(test)] mod tests { use super::*; + use crate::wire_conversions::{user_headers_from_wire, user_headers_to_wire}; + use bytes::{BufMut, BytesMut}; + use iggy_binary_protocol::WireUserHeaders; #[test] fn header_key_should_be_created_for_valid_value() { @@ -1356,7 +1279,7 @@ mod tests { #[test] fn should_be_serialized_as_bytes() { - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("key-1").unwrap(), HeaderValue::from_str("Value 1").unwrap(), @@ -1364,7 +1287,7 @@ mod tests { headers.insert(HeaderKey::try_from("key 1").unwrap(), 12345u64.into()); headers.insert(HeaderKey::try_from("key_3").unwrap(), true.into()); - let bytes = user_headers_to_bytes(&headers); + let bytes = user_headers_to_wire(&headers).into_bytes(); let mut position = 0; let mut headers_count = 0; @@ -1403,7 +1326,7 @@ mod tests { #[test] fn should_be_deserialized_from_bytes() { - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("key-1").unwrap(), HeaderValue::from_str("Value 1").unwrap(), @@ -1421,7 +1344,9 @@ mod tests { bytes.put_slice(&value.value); } - let deserialized_headers = user_headers_from_bytes(bytes.freeze()); + let frozen = bytes.freeze(); + let wire = WireUserHeaders::from_slice(&frozen).unwrap(); + let deserialized_headers = user_headers_from_wire(&wire); assert!(deserialized_headers.is_ok()); let deserialized_headers = deserialized_headers.unwrap(); @@ -1438,15 +1363,16 @@ mod tests { #[test] fn should_serialize_and_deserialize_typed_keys() { - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( 123i32.into(), HeaderValue::from_str("Value for int key").unwrap(), ); headers.insert(999u64.into(), true.into()); - let bytes = user_headers_to_bytes(&headers); - let deserialized = user_headers_from_bytes(bytes).unwrap(); + let bytes = user_headers_to_wire(&headers).into_bytes(); + let wire = WireUserHeaders::from_slice(&bytes).unwrap(); + let deserialized = user_headers_from_wire(&wire).unwrap(); assert_eq!(deserialized.len(), headers.len()); for (key, value) in &headers { @@ -1655,7 +1581,7 @@ mod tests { bytes.put_u32_le(100); // key_len = 100 (lie!) bytes.put_slice(b"abc"); // only 3 bytes of key data - let result = user_headers_from_bytes(bytes.freeze()); + let result = WireUserHeaders::from_slice(&bytes.freeze()); assert!(result.is_err()); } @@ -1682,7 +1608,9 @@ mod tests { bytes.put_u32_le(4); // value_len = 4 (wrong, should be 2) bytes.put_slice(&[1, 2, 3, 4]); - let result = user_headers_from_bytes(bytes.freeze()); + let frozen = bytes.freeze(); + let wire = WireUserHeaders::from_slice(&frozen).unwrap(); + let result = user_headers_from_wire(&wire); assert!(result.is_err()); } @@ -1697,7 +1625,7 @@ mod tests { bytes.put_u8(6); // value_kind = Int32 // Missing value length bytes - let result = user_headers_from_bytes(bytes.freeze()); + let result = WireUserHeaders::from_slice(&bytes.freeze()); assert!(result.is_err()); } diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs index 24608cb62..621f4e3bd 100644 --- a/core/common/src/utils/crypto.rs +++ b/core/common/src/utils/crypto.rs @@ -130,12 +130,12 @@ mod tests { #[test] fn message_payload_and_headers_should_encrypt_and_decrypt_correctly() { use bytes::Bytes; - use std::collections::HashMap; + use std::collections::BTreeMap; let key = [1; 32]; let encryptor = Aes256GcmEncryptor::new(&key).unwrap(); - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("batch").unwrap(), HeaderValue::from(1u64), diff --git a/core/common/src/wire_conversions.rs b/core/common/src/wire_conversions.rs index fc2cc2333..35dfcfe9e 100644 --- a/core/common/src/wire_conversions.rs +++ b/core/common/src/wire_conversions.rs @@ -26,10 +26,10 @@ use crate::{ CacheMetrics, CacheMetricsKey, ClientInfo, ClientInfoDetails, ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus, CompressionAlgorithm, Consumer, ConsumerGroup, ConsumerGroupDetails, ConsumerGroupInfo, ConsumerGroupMember, ConsumerOffsetInfo, - GlobalPermissions, IdKind, IdentityInfo, IggyByteSize, IggyError, IggyExpiry, MaxTopicSize, - Partition, Permissions, PersonalAccessTokenInfo, RawPersonalAccessToken, Stats, Stream, - StreamDetails, StreamPermissions, Topic, TopicDetails, TopicPermissions, TransportEndpoints, - UserInfo, UserInfoDetails, UserStatus, + GlobalPermissions, HeaderKey, HeaderKind, HeaderValue, IdKind, IdentityInfo, IggyByteSize, + IggyError, IggyExpiry, MaxTopicSize, Partition, Permissions, PersonalAccessTokenInfo, + RawPersonalAccessToken, Stats, Stream, StreamDetails, StreamPermissions, Topic, TopicDetails, + TopicPermissions, TransportEndpoints, UserInfo, UserInfoDetails, UserStatus, }; use iggy_binary_protocol::WireConsumer; use iggy_binary_protocol::primitives::permissions::{ @@ -716,3 +716,70 @@ fn topic_permissions_to_wire(topic_id: usize, tp: &TopicPermissions) -> WireTopi send_messages: tp.send_messages, } } + +// -- User Headers conversions -- + +/// Encode domain user headers into a [`WireUserHeaders`] wrapper. +pub fn user_headers_to_wire( + headers: &BTreeMap<HeaderKey, HeaderValue>, +) -> iggy_binary_protocol::WireUserHeaders { + use bytes::{BufMut, BytesMut}; + use iggy_binary_protocol::WireUserHeaders; + + if headers.is_empty() { + return WireUserHeaders::empty(); + } + let size: usize = headers + .iter() + .map(|(k, v)| 1 + 4 + k.as_bytes().len() + 1 + 4 + v.as_bytes().len()) + .sum(); + let mut buf = BytesMut::with_capacity(size); + for (key, value) in headers { + buf.put_u8(key.kind().as_code()); + #[allow(clippy::cast_possible_truncation)] + buf.put_u32_le(key.as_bytes().len() as u32); + buf.put_slice(key.as_bytes()); + buf.put_u8(value.kind().as_code()); + #[allow(clippy::cast_possible_truncation)] + buf.put_u32_le(value.as_bytes().len() as u32); + buf.put_slice(value.as_bytes()); + } + // Buffer was just encoded from valid HeaderKey/HeaderValue entries, + // so structural TLV validity is guaranteed by construction. + WireUserHeaders::from_validated(buf.freeze()) +} + +/// Decode a [`WireUserHeaders`] wrapper into domain user headers. +/// +/// Wire-level validation accepts unknown kind codes for forward compatibility +/// (VSR rolling upgrades). Domain-level `from_code()` rejects them - the wire +/// layer stores structurally valid data, the domain layer requires known semantics. +pub fn user_headers_from_wire( + wire: &iggy_binary_protocol::WireUserHeaders, +) -> Result<BTreeMap<HeaderKey, HeaderValue>, IggyError> { + if wire.is_empty() { + return Ok(BTreeMap::new()); + } + let mut headers = BTreeMap::new(); + for entry in wire.iter() { + let key_kind = HeaderKind::from_code(entry.key_kind.0)?; + if let Some(expected) = key_kind.expected_size() + && entry.key.len() != expected + { + return Err(IggyError::InvalidHeaderKey); + } + + let value_kind = HeaderKind::from_code(entry.value_kind.0)?; + if let Some(expected) = value_kind.expected_size() + && entry.value.len() != expected + { + return Err(IggyError::InvalidHeaderValue); + } + + headers.insert( + HeaderKey::new_unchecked(key_kind, entry.key), + HeaderValue::new_unchecked(value_kind, entry.value), + ); + } + Ok(headers) +} diff --git a/core/connectors/runtime/src/sink.rs b/core/connectors/runtime/src/sink.rs index 48961aca6..b6c8965d3 100644 --- a/core/connectors/runtime/src/sink.rs +++ b/core/connectors/runtime/src/sink.rs @@ -452,7 +452,7 @@ async fn process_messages( checksum: message.header.checksum, timestamp: message.header.timestamp, origin_timestamp: message.header.origin_timestamp, - headers: message.user_headers_map().unwrap_or_default(), + headers: message.user_headers_map().unwrap_or(None), payload: message.payload.into(), }); diff --git a/core/connectors/runtime/src/source.rs b/core/connectors/runtime/src/source.rs index 51ec06026..c4af20450 100644 --- a/core/connectors/runtime/src/source.rs +++ b/core/connectors/runtime/src/source.rs @@ -30,7 +30,7 @@ use iggy_connector_sdk::{ }; use once_cell::sync::Lazy; use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, str::FromStr, sync::{Arc, atomic::Ordering}, }; @@ -567,7 +567,7 @@ pub(crate) extern "C" fn handle_produced_messages( fn build_iggy_message( payload: Vec<u8>, id: Option<u128>, - headers: Option<HashMap<HeaderKey, HeaderValue>>, + headers: Option<BTreeMap<HeaderKey, HeaderValue>>, ) -> Result<IggyMessage, IggyError> { match (id, headers) { (Some(id), Some(h)) => IggyMessage::builder() diff --git a/core/connectors/sdk/src/lib.rs b/core/connectors/sdk/src/lib.rs index 9a372de10..a864407be 100644 --- a/core/connectors/sdk/src/lib.rs +++ b/core/connectors/sdk/src/lib.rs @@ -31,7 +31,7 @@ use iggy::prelude::{HeaderKey, HeaderValue}; use once_cell::sync::OnceCell; use prost::Message; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc}; use strum_macros::{Display, IntoStaticStr}; use thiserror::Error; use tokio::runtime::Runtime; @@ -292,7 +292,7 @@ pub struct ReceivedMessage { pub checksum: u64, pub timestamp: u64, pub origin_timestamp: u64, - pub headers: Option<HashMap<HeaderKey, HeaderValue>>, + pub headers: Option<BTreeMap<HeaderKey, HeaderValue>>, pub payload: Vec<u8>, } @@ -311,7 +311,7 @@ pub struct ProducedMessage { pub checksum: Option<u64>, pub timestamp: Option<u64>, pub origin_timestamp: Option<u64>, - pub headers: Option<HashMap<HeaderKey, HeaderValue>>, + pub headers: Option<BTreeMap<HeaderKey, HeaderValue>>, pub payload: Vec<u8>, } @@ -323,7 +323,7 @@ pub struct DecodedMessage { pub checksum: Option<u64>, pub timestamp: Option<u64>, pub origin_timestamp: Option<u64>, - pub headers: Option<HashMap<HeaderKey, HeaderValue>>, + pub headers: Option<BTreeMap<HeaderKey, HeaderValue>>, pub payload: Payload, } @@ -354,7 +354,7 @@ pub struct ConsumedMessage { pub checksum: u64, pub timestamp: u64, pub origin_timestamp: u64, - pub headers: Option<HashMap<HeaderKey, HeaderValue>>, + pub headers: Option<BTreeMap<HeaderKey, HeaderValue>>, pub payload: Payload, } diff --git a/core/connectors/sinks/http_sink/src/lib.rs b/core/connectors/sinks/http_sink/src/lib.rs index 03bc5ba44..6f86885b5 100644 --- a/core/connectors/sinks/http_sink/src/lib.rs +++ b/core/connectors/sinks/http_sink/src/lib.rs @@ -1259,6 +1259,7 @@ impl Sink for HttpSink { mod tests { use super::*; use iggy_connector_sdk::Schema; + use std::collections::BTreeMap; const FIELD_DATA: &str = "data"; const FIELD_PAYLOAD_ENCODING: &str = "iggy_payload_encoding"; @@ -1651,7 +1652,7 @@ mod tests { let topic_meta = given_topic_metadata(); let msg_meta = given_messages_metadata(); - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( "x-correlation-id".parse().unwrap(), "abc-123".parse().unwrap(), diff --git a/core/integration/tests/cli/message/test_message_poll_command.rs b/core/integration/tests/cli/message/test_message_poll_command.rs index 7d7532912..0ed93d217 100644 --- a/core/integration/tests/cli/message/test_message_poll_command.rs +++ b/core/integration/tests/cli/message/test_message_poll_command.rs @@ -25,7 +25,7 @@ use bytes::Bytes; use iggy::prelude::*; use predicates::str::{contains, starts_with}; use serial_test::parallel; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::str::FromStr; struct TestMessagePollCmd { @@ -143,7 +143,7 @@ impl IggyCmdTestCase for TestMessagePollCmd { let payload = Bytes::from(s.as_bytes().to_vec()); IggyMessage::builder() .payload(payload) - .user_headers(HashMap::from([self.headers.clone()])) + .user_headers(BTreeMap::from([self.headers.clone()])) .build() .expect("Failed to create message with headers") }) diff --git a/core/integration/tests/cli/message/test_message_poll_to_file_command.rs b/core/integration/tests/cli/message/test_message_poll_to_file_command.rs index 3f9d029e5..842b6f963 100644 --- a/core/integration/tests/cli/message/test_message_poll_to_file_command.rs +++ b/core/integration/tests/cli/message/test_message_poll_to_file_command.rs @@ -23,7 +23,7 @@ use bytes::Bytes; use iggy::prelude::*; use predicates::str::{contains, is_match, starts_with}; use serial_test::parallel; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::path::Path; use std::str::FromStr; @@ -33,7 +33,7 @@ pub(super) struct TestMessagePollToFileCmd<'a> { messages: Vec<&'a str>, message_count: usize, strategy: PollingStrategy, - headers: HashMap<HeaderKey, HeaderValue>, + headers: BTreeMap<HeaderKey, HeaderValue>, output_file: String, cleanup: bool, // These will be populated after creating the resources @@ -49,7 +49,7 @@ impl<'a> TestMessagePollToFileCmd<'a> { messages: &[&'a str], message_count: usize, strategy: PollingStrategy, - headers: HashMap<HeaderKey, HeaderValue>, + headers: BTreeMap<HeaderKey, HeaderValue>, output_file: &str, cleanup: bool, ) -> Self { @@ -240,7 +240,7 @@ pub async fn should_be_successful() { "accusantium doloremque laudantium, totam rem aperiam, eaque ipsa", ]; - let test_headers = HashMap::from([ + let test_headers = BTreeMap::from([ ( HeaderKey::from_str("HeaderKey1").unwrap(), HeaderValue::from_str("HeaderValue1").unwrap(), diff --git a/core/integration/tests/cli/message/test_message_reply_via_file.rs b/core/integration/tests/cli/message/test_message_reply_via_file.rs index f4cbd6c6e..d65518c36 100644 --- a/core/integration/tests/cli/message/test_message_reply_via_file.rs +++ b/core/integration/tests/cli/message/test_message_reply_via_file.rs @@ -21,7 +21,7 @@ use crate::cli::message::test_message_poll_to_file_command::TestMessagePollToFil use crate::cli::message::test_message_send_from_file_command::TestMessageSendFromFileCmd; use iggy::prelude::*; use serial_test::parallel; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::str::FromStr; #[tokio::test] @@ -42,7 +42,7 @@ pub async fn should_be_successful() { "accusantium doloremque laudantium, totam rem aperiam, eaque ipsa", ]; - let test_headers = HashMap::from([ + let test_headers = BTreeMap::from([ ( HeaderKey::from_str("HeaderKey1").unwrap(), HeaderValue::from_str("HeaderValue1").unwrap(), diff --git a/core/integration/tests/cli/message/test_message_send_command.rs b/core/integration/tests/cli/message/test_message_send_command.rs index 97ed4fe66..d6037446a 100644 --- a/core/integration/tests/cli/message/test_message_send_command.rs +++ b/core/integration/tests/cli/message/test_message_send_command.rs @@ -24,7 +24,7 @@ use async_trait::async_trait; use iggy::prelude::*; use predicates::str::diff; use serial_test::parallel; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::str::from_utf8; use twox_hash::XxHash32; @@ -58,7 +58,7 @@ struct TestMessageSendCmd { messages: Vec<String>, message_input: ProvideMessages, partitioning: PartitionSelection, - header: Option<HashMap<HeaderKey, HeaderValue>>, + header: Option<BTreeMap<HeaderKey, HeaderValue>>, // These will be populated after creating the resources actual_stream_id: Option<u32>, actual_topic_id: Option<u32>, @@ -72,7 +72,7 @@ impl TestMessageSendCmd { messages: Vec<String>, message_input: ProvideMessages, partitioning: PartitionSelection, - header: Option<HashMap<HeaderKey, HeaderValue>>, + header: Option<BTreeMap<HeaderKey, HeaderValue>>, ) -> Self { Self { stream_name, @@ -345,7 +345,7 @@ pub async fn should_be_successful() { ], message_input, using_partitioning, - Some(HashMap::from([ + Some(BTreeMap::from([ ( HeaderKey::try_from("key1").unwrap(), HeaderValue::try_from("value1").unwrap(), diff --git a/core/integration/tests/cli/message/test_message_send_from_file_command.rs b/core/integration/tests/cli/message/test_message_send_from_file_command.rs index 1089bd7ff..eb45d5487 100644 --- a/core/integration/tests/cli/message/test_message_send_from_file_command.rs +++ b/core/integration/tests/cli/message/test_message_send_from_file_command.rs @@ -23,7 +23,7 @@ use bytes::Bytes; use iggy::prelude::*; use predicates::str::{ends_with, is_match, starts_with}; use serial_test::parallel; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::str::FromStr; use tokio::io::AsyncWriteExt; @@ -34,7 +34,7 @@ pub(super) struct TestMessageSendFromFileCmd<'a> { topic_name: String, messages: Vec<&'a str>, message_count: usize, - headers: HashMap<HeaderKey, HeaderValue>, + headers: BTreeMap<HeaderKey, HeaderValue>, // These will be populated after creating the resources actual_stream_id: Option<u32>, actual_topic_id: Option<u32>, @@ -49,7 +49,7 @@ impl<'a> TestMessageSendFromFileCmd<'a> { topic_name: &str, messages: &[&'a str], message_count: usize, - headers: HashMap<HeaderKey, HeaderValue>, + headers: BTreeMap<HeaderKey, HeaderValue>, ) -> Self { assert!(message_count <= messages.len()); Self { @@ -273,7 +273,7 @@ pub async fn should_be_successful() { "accusantium doloremque laudantium, totam rem aperiam, eaque ipsa", ]; - let test_headers = HashMap::from([ + let test_headers = BTreeMap::from([ ( HeaderKey::from_str("HeaderKey1").unwrap(), HeaderValue::from_str("HeaderValue1").unwrap(), diff --git a/core/integration/tests/server/scenarios/create_message_payload.rs b/core/integration/tests/server/scenarios/create_message_payload.rs index f44a67333..d19c7aee4 100644 --- a/core/integration/tests/server/scenarios/create_message_payload.rs +++ b/core/integration/tests/server/scenarios/create_message_payload.rs @@ -19,7 +19,7 @@ use bytes::Bytes; use iggy::prelude::*; use integration::harness::{TestHarness, assert_clean_system}; -use std::collections::HashMap; +use std::collections::BTreeMap; const STREAM_NAME: &str = "test-stream"; const TOPIC_NAME: &str = "test-topic"; @@ -137,8 +137,8 @@ fn create_message_payload(offset: u64) -> Bytes { Bytes::from(format!("message {offset}")) } -fn create_message_headers() -> HashMap<HeaderKey, HeaderValue> { - let mut headers = HashMap::new(); +fn create_message_headers() -> BTreeMap<HeaderKey, HeaderValue> { + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("key_1").unwrap(), HeaderValue::try_from("Value 1").unwrap(), diff --git a/core/integration/tests/server/scenarios/encryption_scenario.rs b/core/integration/tests/server/scenarios/encryption_scenario.rs index 6881f5f8c..8778e8fbd 100644 --- a/core/integration/tests/server/scenarios/encryption_scenario.rs +++ b/core/integration/tests/server/scenarios/encryption_scenario.rs @@ -21,7 +21,7 @@ use iggy::prelude::*; use iggy_common::TransportProtocol; use integration::harness::{TestHarness, TestServerConfig}; use serial_test::parallel; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::path::{Path, PathBuf}; use std::sync::Arc; use test_case::test_matrix; @@ -63,7 +63,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp let mut messages_batch_1 = Vec::new(); for i in 0..messages_per_batch { - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert(HeaderKey::try_from("batch").unwrap(), 1u64.into()); headers.insert(HeaderKey::try_from("index").unwrap(), i.into()); headers.insert( @@ -217,7 +217,7 @@ async fn should_fill_data_with_headers_and_verify_after_restart_using_api(encryp let mut messages_batch_2 = Vec::new(); for i in 0..messages_per_batch { - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert(HeaderKey::try_from("batch").unwrap(), 2u64.into()); headers.insert(HeaderKey::try_from("index").unwrap(), i.into()); headers.insert( @@ -384,7 +384,7 @@ async fn should_encrypt_and_decrypt_headers_with_client_side_encryption( let mut messages = Vec::new(); for i in 0..10i64 { - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert(HeaderKey::try_from("index").unwrap(), HeaderValue::from(i)); headers.insert( HeaderKey::try_from("transport").unwrap(), diff --git a/core/integration/tests/server/scenarios/message_headers_scenario.rs b/core/integration/tests/server/scenarios/message_headers_scenario.rs index 714c56247..057572407 100644 --- a/core/integration/tests/server/scenarios/message_headers_scenario.rs +++ b/core/integration/tests/server/scenarios/message_headers_scenario.rs @@ -22,7 +22,7 @@ use crate::server::scenarios::{ use bytes::Bytes; use iggy::prelude::*; use integration::harness::{TestHarness, assert_clean_system}; -use std::collections::HashMap; +use std::collections::BTreeMap; pub async fn run(harness: &TestHarness) { let client = harness @@ -129,8 +129,8 @@ fn create_message_payload(offset: u64) -> Bytes { Bytes::from(format!("message {offset}")) } -fn create_message_headers() -> HashMap<HeaderKey, HeaderValue> { - let mut headers = HashMap::new(); +fn create_message_headers() -> BTreeMap<HeaderKey, HeaderValue> { + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("key_1").unwrap(), HeaderValue::try_from("Value 1").unwrap(), diff --git a/core/integration/tests/server/scenarios/message_size_scenario.rs b/core/integration/tests/server/scenarios/message_size_scenario.rs index 14b240d6a..fb047ef2c 100644 --- a/core/integration/tests/server/scenarios/message_size_scenario.rs +++ b/core/integration/tests/server/scenarios/message_size_scenario.rs @@ -19,7 +19,7 @@ use bytes::Bytes; use iggy::prelude::*; use integration::harness::{TestHarness, assert_clean_system}; -use std::collections::HashMap; +use std::collections::BTreeMap; const STREAM_NAME: &str = "test-stream"; const TOPIC_NAME: &str = "test-topic"; @@ -199,8 +199,8 @@ fn create_string_of_size(size: usize) -> String { "x".repeat(size) } -fn create_message_header_of_size(target_size: usize) -> HashMap<HeaderKey, HeaderValue> { - let mut headers = HashMap::new(); +fn create_message_header_of_size(target_size: usize) -> BTreeMap<HeaderKey, HeaderValue> { + let mut headers = BTreeMap::new(); let mut current_size = 0; let mut header_id: u32 = 0; diff --git a/core/integration/tests/server/scenarios/offset_scenario.rs b/core/integration/tests/server/scenarios/offset_scenario.rs index 282e355fa..15003a06b 100644 --- a/core/integration/tests/server/scenarios/offset_scenario.rs +++ b/core/integration/tests/server/scenarios/offset_scenario.rs @@ -19,9 +19,10 @@ use super::PARTITION_ID; use bytes::BytesMut; use iggy::prelude::*; -use iggy_common::user_headers_from_bytes; +use iggy_binary_protocol::WireUserHeaders; +use iggy_common::wire_conversions::user_headers_from_wire; use integration::harness::TestHarness; -use std::collections::HashMap; +use std::collections::BTreeMap; fn small_batches() -> Vec<u32> { vec![3, 4, 5, 6, 7] @@ -173,7 +174,7 @@ fn create_single_message(id: u32, message_size: u64) -> IggyMessage { payload.resize(message_size as usize, 0xD); let payload = payload.freeze(); - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("key_1").unwrap(), HeaderValue::try_from("Value 1").unwrap(), @@ -404,7 +405,8 @@ async fn verify_message_content( ); if let Some(headers) = &msg.user_headers { - let headers_map = user_headers_from_bytes(headers.clone()).unwrap(); + let headers_map = + user_headers_from_wire(&WireUserHeaders::from_slice(headers).unwrap()).unwrap(); assert_eq!(headers_map.len(), 3, "Expected 3 headers"); } } diff --git a/core/integration/tests/server/scenarios/timestamp_scenario.rs b/core/integration/tests/server/scenarios/timestamp_scenario.rs index 661970dd6..41b3be957 100644 --- a/core/integration/tests/server/scenarios/timestamp_scenario.rs +++ b/core/integration/tests/server/scenarios/timestamp_scenario.rs @@ -19,9 +19,10 @@ use super::PARTITION_ID; use bytes::BytesMut; use iggy::prelude::*; -use iggy_common::user_headers_from_bytes; +use iggy_binary_protocol::WireUserHeaders; +use iggy_common::wire_conversions::user_headers_from_wire; use integration::harness::TestHarness; -use std::collections::HashMap; +use std::collections::BTreeMap; use tokio::time::{Duration, sleep}; fn small_batches() -> Vec<u32> { @@ -197,7 +198,7 @@ fn create_single_message(id: u32, message_size: u64) -> IggyMessage { payload.resize(message_size as usize, 0xD); let payload = payload.freeze(); - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("key_1").unwrap(), HeaderValue::try_from("Value 1").unwrap(), @@ -428,7 +429,8 @@ async fn verify_message_content_by_timestamp( ); if let Some(headers) = &msg.user_headers { - let headers_map = user_headers_from_bytes(headers.clone()).unwrap(); + let headers_map = + user_headers_from_wire(&WireUserHeaders::from_slice(headers).unwrap()).unwrap(); assert_eq!(headers_map.len(), 3, "Expected 3 headers"); } } diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index 72e426ce5..3cb66f07a 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -63,7 +63,6 @@ pub use iggy_common::{ TcpClientReconnectionConfig, Topic, TopicDetails, TopicPermissions, TransportEndpoints, TransportProtocol, UserId, UserStatus, Validatable, WebSocketClientConfig, WebSocketClientConfigBuilder, WebSocketClientReconnectionConfig, defaults, locking, - user_headers_from_bytes, user_headers_to_bytes, }; pub use iggy_common::{ Client, ClusterClient, ConsumerGroupClient, ConsumerOffsetClient, MessageClient, diff --git a/core/tools/src/data-seeder/seeder.rs b/core/tools/src/data-seeder/seeder.rs index 8a047bda1..0d60ef4ab 100644 --- a/core/tools/src/data-seeder/seeder.rs +++ b/core/tools/src/data-seeder/seeder.rs @@ -18,7 +18,7 @@ use iggy::prelude::*; use rand::RngExt; -use std::collections::HashMap; +use std::collections::BTreeMap; const PROD_STREAM_NAME: &str = "prod"; const TEST_STREAM_NAME: &str = "test"; @@ -155,7 +155,7 @@ async fn send_messages(client: &IggyClient, streams: &[(String, u32)]) -> Result let headers = match rng.random_bool(0.5) { false => None, true => { - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("key 1")?, HeaderValue::try_from("value1")?, diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index 0645f553b..67005fd50 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -28,6 +28,7 @@ chrono = { workspace = true } clap = { workspace = true } futures-util = { workspace = true } iggy = { workspace = true } +iggy_common = { workspace = true } lz4_flex = { workspace = true } rand = { workspace = true } serde = { workspace = true } diff --git a/examples/rust/src/message-headers/message-compression/consumer/main.rs b/examples/rust/src/message-headers/message-compression/consumer/main.rs index 3dc14d308..32e6050a4 100644 --- a/examples/rust/src/message-headers/message-compression/consumer/main.rs +++ b/examples/rust/src/message-headers/message-compression/consumer/main.rs @@ -19,6 +19,7 @@ use bytes::Bytes; use futures_util::stream::StreamExt; use iggy::prelude::*; +use iggy_common::wire_conversions::user_headers_to_wire; // The compression and decompression utilities are shared between the producer and consumer compression examples. // Hence, we import them here. use iggy_examples::shared::codec::{Codec, NUM_MESSAGES, STREAM_NAME, TOPIC_NAME}; @@ -76,12 +77,13 @@ fn handle_payload_compression(msg: &mut ReceivedMessage) -> Result<(), IggyError // remove the compression header since payload is now decompressed if let Ok(Some(mut headers_map)) = msg.message.user_headers_map() { headers_map.remove(&Codec::header_key()); - let headers_bytes = user_headers_to_bytes(&headers_map); - msg.message.header.user_headers_length = headers_bytes.len() as u32; - msg.message.user_headers = if headers_map.is_empty() { - None + if headers_map.is_empty() { + msg.message.header.user_headers_length = 0; + msg.message.user_headers = None; } else { - Some(headers_bytes) + let wire = user_headers_to_wire(&headers_map); + msg.message.header.user_headers_length = wire.as_bytes().len() as u32; + msg.message.user_headers = Some(wire.into_bytes()); }; } } diff --git a/examples/rust/src/message-headers/message-compression/producer/main.rs b/examples/rust/src/message-headers/message-compression/producer/main.rs index 5c258b314..ed06a0626 100644 --- a/examples/rust/src/message-headers/message-compression/producer/main.rs +++ b/examples/rust/src/message-headers/message-compression/producer/main.rs @@ -18,7 +18,7 @@ use bytes::Bytes; use iggy::prelude::*; -use std::collections::HashMap; +use std::collections::BTreeMap; // The compression and decompression utilities are shared between the producer and consumer compression examples. // Hence, we import them here. use iggy_examples::shared::codec::{Codec, NUM_MESSAGES, STREAM_NAME, TOPIC_NAME}; @@ -59,7 +59,7 @@ async fn main() -> Result<(), IggyError> { // NOTE: This is where the Codec is used to prepare the compression user-header for the IggyMessage. let key = Codec::header_key(); let value = codec.to_header_value(); - let compression_headers = HashMap::from([(key, value)]); + let compression_headers = BTreeMap::from([(key, value)]); // Generate artificial example messages to send to the server. let mut messages = Vec::new(); diff --git a/examples/rust/src/message-headers/message-type/producer/main.rs b/examples/rust/src/message-headers/message-type/producer/main.rs index f394fddfe..85596106f 100644 --- a/examples/rust/src/message-headers/message-type/producer/main.rs +++ b/examples/rust/src/message-headers/message-type/producer/main.rs @@ -22,7 +22,7 @@ use iggy::prelude::*; use iggy_examples::shared::args::Args; use iggy_examples::shared::messages_generator::MessagesGenerator; use iggy_examples::shared::system; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::error::Error; use std::sync::Arc; use tracing::info; @@ -83,7 +83,7 @@ async fn produce_messages(args: &Args, client: &IggyClient) -> Result<(), Box<dy let json = serializable_message.to_json(); // The message type will be stored in the custom message header. - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("message_type").unwrap(), HeaderValue::try_from(message_type).unwrap(), diff --git a/examples/rust/src/message-headers/typed-headers/producer/main.rs b/examples/rust/src/message-headers/typed-headers/producer/main.rs index 3e6d3c5a8..a9a36b730 100644 --- a/examples/rust/src/message-headers/typed-headers/producer/main.rs +++ b/examples/rust/src/message-headers/typed-headers/producer/main.rs @@ -21,7 +21,7 @@ use bytes::Bytes; use iggy::prelude::*; use iggy_examples::shared::args::Args; use iggy_examples::shared::system; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::error::Error; use std::sync::Arc; use tracing::info; @@ -78,7 +78,7 @@ async fn produce_messages(args: &Args, client: &IggyClient) -> Result<(), Box<dy for _ in 0..args.messages_per_batch { message_id += 1; - let mut headers = HashMap::new(); + let mut headers = BTreeMap::new(); headers.insert( HeaderKey::try_from("event_type")?, HeaderValue::try_from("user_action")?,
