This is an automated email from the ASF dual-hosted git repository.
leerho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datasketches-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b0e0135 refactor: replace byteorder with SketchBytes and SketchSlice
(#49)
b0e0135 is described below
commit b0e0135abd9a73dd29d2b2520b97b400893cc3b7
Author: tison <[email protected]>
AuthorDate: Wed Dec 31 00:16:42 2025 +0800
refactor: replace byteorder with SketchBytes and SketchSlice (#49)
* refactor: replace byteorder with SketchBytes and SketchSlice
Signed-off-by: tison <[email protected]>
* for hll
Signed-off-by: tison <[email protected]>
* deser hll
Signed-off-by: tison <[email protected]>
* fixup
Signed-off-by: tison <[email protected]>
* fmt
Signed-off-by: tison <[email protected]>
---------
Signed-off-by: tison <[email protected]>
---
Cargo.lock | 22 +--
Cargo.toml | 2 -
datasketches/Cargo.toml | 2 -
datasketches/src/codec.rs | 243 +++++++++++++++++++++++++++
datasketches/src/countmin/sketch.rs | 50 +++---
datasketches/src/hash/mod.rs | 11 ++
datasketches/src/hash/murmurhash.rs | 20 +--
datasketches/src/hash/xxhash.rs | 15 +-
datasketches/src/hll/array4.rs | 97 +++++------
datasketches/src/hll/array6.rs | 66 ++++----
datasketches/src/hll/array8.rs | 70 ++++----
datasketches/src/hll/hash_set.rs | 82 +++++----
datasketches/src/hll/list.rs | 60 +++----
datasketches/src/hll/serialization.rs | 106 +-----------
datasketches/src/hll/sketch.rs | 53 ++++--
datasketches/src/lib.rs | 1 +
datasketches/src/tdigest/sketch.rs | 90 ++++------
datasketches/tests/hll_serialization_test.rs | 10 +-
18 files changed, 544 insertions(+), 456 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 66212a0..f2aefc1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -61,12 +61,6 @@ dependencies = [
"windows-sys",
]
-[[package]]
-name = "anyhow"
-version = "1.0.100"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61"
-
[[package]]
name = "autocfg"
version = "1.5.0"
@@ -79,12 +73,6 @@ version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3"
-[[package]]
-name = "byteorder"
-version = "1.5.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
-
[[package]]
name = "clap"
version = "4.5.53"
@@ -135,8 +123,6 @@ checksum =
"b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
name = "datasketches"
version = "0.1.0"
dependencies = [
- "anyhow",
- "byteorder",
"googletest",
]
@@ -233,9 +219,9 @@ checksum =
"384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
[[package]]
name = "proc-macro2"
-version = "1.0.103"
+version = "1.0.104"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
+checksum = "9695f8df41bb4f3d222c95a67532365f569318332d03d5f3f67f37b20e6ebdf0"
dependencies = [
"unicode-ident",
]
@@ -280,9 +266,9 @@ checksum =
"7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
[[package]]
name = "rustix"
-version = "1.1.2"
+version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e"
+checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34"
dependencies = [
"bitflags",
"errno",
diff --git a/Cargo.toml b/Cargo.toml
index 4c93ee2..30c7a35 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -32,8 +32,6 @@ rust-version = "1.85.0"
datasketches = { path = "datasketches" }
# Crates.io dependencies
-anyhow = { version = "1.0.100" }
-byteorder = { version = "1.5.0" }
clap = { version = "4.5.20", features = ["derive"] }
googletest = { version = "0.14.2" }
which = { version = "8.0.0" }
diff --git a/datasketches/Cargo.toml b/datasketches/Cargo.toml
index 8a150ff..7d70103 100644
--- a/datasketches/Cargo.toml
+++ b/datasketches/Cargo.toml
@@ -35,8 +35,6 @@ all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
-anyhow = { workspace = true }
-byteorder = { workspace = true }
[dev-dependencies]
googletest = { workspace = true }
diff --git a/datasketches/src/codec.rs b/datasketches/src/codec.rs
new file mode 100644
index 0000000..4df7b22
--- /dev/null
+++ b/datasketches/src/codec.rs
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#![allow(dead_code)]
+
+use std::io;
+use std::io::Cursor;
+use std::io::Read;
+
+pub(crate) struct SketchBytes {
+ bytes: Vec<u8>,
+}
+
+impl SketchBytes {
+ pub fn with_capacity(capacity: usize) -> Self {
+ Self {
+ bytes: Vec::with_capacity(capacity),
+ }
+ }
+
+ pub fn into_bytes(self) -> Vec<u8> {
+ self.bytes
+ }
+
+ pub fn write(&mut self, buf: &[u8]) {
+ self.bytes.extend_from_slice(buf);
+ }
+
+ pub fn write_u8(&mut self, n: u8) {
+ self.bytes.push(n);
+ }
+
+ pub fn write_i8(&mut self, n: i8) {
+ self.bytes.push(n as u8);
+ }
+
+ pub fn write_u16_le(&mut self, n: u16) {
+ self.write(&n.to_le_bytes());
+ }
+
+ pub fn write_u16_be(&mut self, n: u16) {
+ self.write(&n.to_be_bytes());
+ }
+
+ pub fn write_i16_le(&mut self, n: i16) {
+ self.write(&n.to_le_bytes());
+ }
+
+ pub fn write_i16_be(&mut self, n: i16) {
+ self.write(&n.to_be_bytes());
+ }
+
+ pub fn write_u32_le(&mut self, n: u32) {
+ self.write(&n.to_le_bytes());
+ }
+
+ pub fn write_u32_be(&mut self, n: u32) {
+ self.write(&n.to_be_bytes());
+ }
+
+ pub fn write_i32_le(&mut self, n: i32) {
+ self.write(&n.to_le_bytes());
+ }
+
+ pub fn write_i32_be(&mut self, n: i32) {
+ self.write(&n.to_be_bytes());
+ }
+
+ pub fn write_u64_le(&mut self, n: u64) {
+ self.write(&n.to_le_bytes());
+ }
+
+ pub fn write_u64_be(&mut self, n: u64) {
+ self.write(&n.to_be_bytes());
+ }
+
+ pub fn write_i64_le(&mut self, n: i64) {
+ self.write(&n.to_le_bytes());
+ }
+
+ pub fn write_i64_be(&mut self, n: i64) {
+ self.write(&n.to_be_bytes());
+ }
+
+ pub fn write_f32_le(&mut self, n: f32) {
+ self.write(&n.to_le_bytes());
+ }
+
+ pub fn write_f32_be(&mut self, n: f32) {
+ self.write(&n.to_be_bytes());
+ }
+
+ pub fn write_f64_le(&mut self, n: f64) {
+ self.write(&n.to_le_bytes());
+ }
+
+ pub fn write_f64_be(&mut self, n: f64) {
+ self.write(&n.to_be_bytes());
+ }
+}
+
+pub(crate) struct SketchSlice<'a> {
+ slice: Cursor<&'a [u8]>,
+}
+
+impl SketchSlice<'_> {
+ pub fn new(slice: &[u8]) -> SketchSlice<'_> {
+ SketchSlice {
+ slice: Cursor::new(slice),
+ }
+ }
+
+ pub fn advance(&mut self, n: u64) {
+ let pos = self.slice.position();
+ self.slice.set_position(pos + n);
+ }
+
+ pub fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
+ self.slice.read_exact(buf)
+ }
+
+ pub fn read_u8(&mut self) -> io::Result<u8> {
+ let mut buf = [0u8; 1];
+ self.read_exact(&mut buf)?;
+ Ok(buf[0])
+ }
+
+ pub fn read_i8(&mut self) -> io::Result<i8> {
+ let mut buf = [0u8; 1];
+ self.read_exact(&mut buf)?;
+ Ok(buf[0] as i8)
+ }
+
+ pub fn read_u16_le(&mut self) -> io::Result<u16> {
+ let mut buf = [0u8; 2];
+ self.read_exact(&mut buf)?;
+ Ok(u16::from_le_bytes(buf))
+ }
+
+ pub fn read_u16_be(&mut self) -> io::Result<u16> {
+ let mut buf = [0u8; 2];
+ self.read_exact(&mut buf)?;
+ Ok(u16::from_be_bytes(buf))
+ }
+
+ pub fn read_i16_le(&mut self) -> io::Result<i16> {
+ let mut buf = [0u8; 2];
+ self.read_exact(&mut buf)?;
+ Ok(i16::from_le_bytes(buf))
+ }
+
+ pub fn read_i16_be(&mut self) -> io::Result<i16> {
+ let mut buf = [0u8; 2];
+ self.read_exact(&mut buf)?;
+ Ok(i16::from_be_bytes(buf))
+ }
+
+ pub fn read_u32_le(&mut self) -> io::Result<u32> {
+ let mut buf = [0u8; 4];
+ self.read_exact(&mut buf)?;
+ Ok(u32::from_le_bytes(buf))
+ }
+
+ pub fn read_u32_be(&mut self) -> io::Result<u32> {
+ let mut buf = [0u8; 4];
+ self.read_exact(&mut buf)?;
+ Ok(u32::from_be_bytes(buf))
+ }
+
+ pub fn read_i32_le(&mut self) -> io::Result<i32> {
+ let mut buf = [0u8; 4];
+ self.read_exact(&mut buf)?;
+ Ok(i32::from_le_bytes(buf))
+ }
+
+ pub fn read_i32_be(&mut self) -> io::Result<i32> {
+ let mut buf = [0u8; 4];
+ self.read_exact(&mut buf)?;
+ Ok(i32::from_be_bytes(buf))
+ }
+
+ pub fn read_u64_le(&mut self) -> io::Result<u64> {
+ let mut buf = [0u8; 8];
+ self.read_exact(&mut buf)?;
+ Ok(u64::from_le_bytes(buf))
+ }
+
+ pub fn read_u64_be(&mut self) -> io::Result<u64> {
+ let mut buf = [0u8; 8];
+ self.read_exact(&mut buf)?;
+ Ok(u64::from_be_bytes(buf))
+ }
+
+ pub fn read_i64_le(&mut self) -> io::Result<i64> {
+ let mut buf = [0u8; 8];
+ self.read_exact(&mut buf)?;
+ Ok(i64::from_le_bytes(buf))
+ }
+
+ pub fn read_i64_be(&mut self) -> io::Result<i64> {
+ let mut buf = [0u8; 8];
+ self.read_exact(&mut buf)?;
+ Ok(i64::from_be_bytes(buf))
+ }
+
+ pub fn read_f32_le(&mut self) -> io::Result<f32> {
+ let mut buf = [0u8; 4];
+ self.read_exact(&mut buf)?;
+ Ok(f32::from_le_bytes(buf))
+ }
+
+ pub fn read_f32_be(&mut self) -> io::Result<f32> {
+ let mut buf = [0u8; 4];
+ self.read_exact(&mut buf)?;
+ Ok(f32::from_be_bytes(buf))
+ }
+
+ pub fn read_f64_le(&mut self) -> io::Result<f64> {
+ let mut buf = [0u8; 8];
+ self.read_exact(&mut buf)?;
+ Ok(f64::from_le_bytes(buf))
+ }
+
+ pub fn read_f64_be(&mut self) -> io::Result<f64> {
+ let mut buf = [0u8; 8];
+ self.read_exact(&mut buf)?;
+ Ok(f64::from_be_bytes(buf))
+ }
+}
diff --git a/datasketches/src/countmin/sketch.rs
b/datasketches/src/countmin/sketch.rs
index 3df99f3..ca08bff 100644
--- a/datasketches/src/countmin/sketch.rs
+++ b/datasketches/src/countmin/sketch.rs
@@ -17,12 +17,10 @@
use std::hash::Hash;
use std::hash::Hasher;
-use std::io::Cursor;
use std::mem::size_of;
-use byteorder::LE;
-use byteorder::ReadBytesExt;
-
+use crate::codec::SketchBytes;
+use crate::codec::SketchSlice;
use crate::countmin::serialization::COUNTMIN_FAMILY_ID;
use crate::countmin::serialization::FLAGS_IS_EMPTY;
use crate::countmin::serialization::LONG_SIZE_BYTES;
@@ -204,28 +202,28 @@ impl CountMinSketch {
} else {
LONG_SIZE_BYTES + (self.counts.len() * size_of::<i64>())
};
- let mut bytes = Vec::with_capacity(header_size + payload_size);
+ let mut bytes = SketchBytes::with_capacity(header_size + payload_size);
- bytes.push(PREAMBLE_LONGS_SHORT);
- bytes.push(SERIAL_VERSION);
- bytes.push(COUNTMIN_FAMILY_ID);
- bytes.push(if self.is_empty() { FLAGS_IS_EMPTY } else { 0 });
- bytes.extend_from_slice(&0u32.to_le_bytes());
+ bytes.write_u8(PREAMBLE_LONGS_SHORT);
+ bytes.write_u8(SERIAL_VERSION);
+ bytes.write_u8(COUNTMIN_FAMILY_ID);
+ bytes.write_u8(if self.is_empty() { FLAGS_IS_EMPTY } else { 0 });
+ bytes.write_u32_le(0); // unused
- bytes.extend_from_slice(&self.num_buckets.to_le_bytes());
- bytes.push(self.num_hashes);
- bytes.extend_from_slice(&compute_seed_hash(self.seed).to_le_bytes());
- bytes.push(0u8);
+ bytes.write_u32_le(self.num_buckets);
+ bytes.write_u8(self.num_hashes);
+ bytes.write_u16_le(compute_seed_hash(self.seed));
+ bytes.write_u8(0);
if self.is_empty() {
- return bytes;
+ return bytes.into_bytes();
}
- bytes.extend_from_slice(&self.total_weight.to_le_bytes());
- for count in &self.counts {
- bytes.extend_from_slice(&count.to_le_bytes());
+ bytes.write_i64_le(self.total_weight);
+ for count in self.counts.iter().copied() {
+ bytes.write_i64_le(count);
}
- bytes
+ bytes.into_bytes()
}
/// Deserializes a sketch from bytes using the default seed.
@@ -239,12 +237,12 @@ impl CountMinSketch {
move |_| Error::insufficient_data(tag)
}
- let mut cursor = Cursor::new(bytes);
+ let mut cursor = SketchSlice::new(bytes);
let preamble_longs =
cursor.read_u8().map_err(make_error("preamble_longs"))?;
let serial_version =
cursor.read_u8().map_err(make_error("serial_version"))?;
let family_id = cursor.read_u8().map_err(make_error("family_id"))?;
let flags = cursor.read_u8().map_err(make_error("flags"))?;
- cursor.read_u32::<LE>().map_err(make_error("unused32"))?;
+ cursor.read_u32_le().map_err(make_error("<unused>"))?;
if family_id != COUNTMIN_FAMILY_ID {
return Err(Error::invalid_family(
@@ -266,9 +264,9 @@ impl CountMinSketch {
));
}
- let num_buckets =
cursor.read_u32::<LE>().map_err(make_error("num_buckets"))?;
+ let num_buckets =
cursor.read_u32_le().map_err(make_error("num_buckets"))?;
let num_hashes = cursor.read_u8().map_err(make_error("num_hashes"))?;
- let seed_hash =
cursor.read_u16::<LE>().map_err(make_error("seed_hash"))?;
+ let seed_hash = cursor.read_u16_le().map_err(make_error("seed_hash"))?;
cursor.read_u8().map_err(make_error("unused8"))?;
let expected_seed_hash = compute_seed_hash(seed);
@@ -284,11 +282,9 @@ impl CountMinSketch {
return Ok(sketch);
}
- sketch.total_weight = cursor
- .read_i64::<LE>()
- .map_err(make_error("total_weight"))?;
+ sketch.total_weight =
cursor.read_i64_le().map_err(make_error("total_weight"))?;
for count in sketch.counts.iter_mut() {
- *count = cursor.read_i64::<LE>().map_err(make_error("counts"))?;
+ *count = cursor.read_i64_le().map_err(make_error("counts"))?;
}
Ok(sketch)
}
diff --git a/datasketches/src/hash/mod.rs b/datasketches/src/hash/mod.rs
index a094f39..d9df24a 100644
--- a/datasketches/src/hash/mod.rs
+++ b/datasketches/src/hash/mod.rs
@@ -36,3 +36,14 @@ pub(crate) use self::xxhash::XxHash64;
/// original source key value and the hashed bit string would be violated.
Once you have developed
/// a history of stored sketches you are stuck with it.
pub(crate) const DEFAULT_UPDATE_SEED: u64 = 9001;
+
+/// Reads an u64 from a byte slice in little-endian order.
+///
+/// # Panics
+///
+/// Panics if `bytes.len()` is greater than 8.
+fn read_u64_le(bytes: &[u8]) -> u64 {
+ let mut buf = [0u8; 8];
+ buf[..bytes.len()].copy_from_slice(bytes);
+ u64::from_le_bytes(buf)
+}
diff --git a/datasketches/src/hash/murmurhash.rs
b/datasketches/src/hash/murmurhash.rs
index e275e36..a99f5eb 100644
--- a/datasketches/src/hash/murmurhash.rs
+++ b/datasketches/src/hash/murmurhash.rs
@@ -17,9 +17,6 @@
use std::hash::Hasher;
-use byteorder::ByteOrder;
-use byteorder::LE;
-
use crate::hash::DEFAULT_UPDATE_SEED;
const C1: u64 = 0x87c37b91114253d5;
@@ -58,11 +55,8 @@ impl MurmurHash3X64128 {
if rem > 0 {
if rem > 8 {
// read k2 little endian
- let mut buf = [0u8; 8];
- let k2_len = rem - 8;
- buf[..k2_len].copy_from_slice(&self.buf[8..rem]);
+ let mut k2 = super::read_u64_le(&self.buf[8..rem]);
// mix k2
- let mut k2 = u64::from_le_bytes(buf);
k2 = k2.wrapping_mul(C2);
k2 = k2.rotate_left(33);
k2 = k2.wrapping_mul(C1);
@@ -70,11 +64,9 @@ impl MurmurHash3X64128 {
}
// read k1 little endian
- let mut buf = [0u8; 8];
let k1_len = rem.min(8);
- buf[..k1_len].copy_from_slice(&self.buf[..k1_len]);
+ let mut k1 = super::read_u64_le(&self.buf[..k1_len]);
// mix k1
- let mut k1 = u64::from_le_bytes(buf);
k1 = k1.wrapping_mul(C1);
k1 = k1.rotate_left(31);
k1 = k1.wrapping_mul(C2);
@@ -143,8 +135,8 @@ impl Hasher for MurmurHash3X64128 {
let wanted = 16 - self.buf_len;
self.buf[self.buf_len..].copy_from_slice(&bytes[..wanted]);
- let k1 = LE::read_u64(&self.buf[0..8]);
- let k2 = LE::read_u64(&self.buf[8..16]);
+ let k1 = super::read_u64_le(&self.buf[0..8]);
+ let k2 = super::read_u64_le(&self.buf[8..16]);
self.update(k1, k2);
bytes = &bytes[wanted..];
@@ -160,8 +152,8 @@ impl Hasher for MurmurHash3X64128 {
let lo = i << 4;
let mi = lo + 8;
let hi = mi + 8;
- let k1 = LE::read_u64(&bytes[lo..mi]);
- let k2 = LE::read_u64(&bytes[mi..hi]);
+ let k1 = super::read_u64_le(&bytes[lo..mi]);
+ let k2 = super::read_u64_le(&bytes[mi..hi]);
self.update(k1, k2);
}
diff --git a/datasketches/src/hash/xxhash.rs b/datasketches/src/hash/xxhash.rs
index d1f0eed..6232c04 100644
--- a/datasketches/src/hash/xxhash.rs
+++ b/datasketches/src/hash/xxhash.rs
@@ -17,9 +17,6 @@
use std::hash::Hasher;
-use byteorder::ByteOrder;
-use byteorder::LE;
-
const DEFAULT_SEED: u64 = 0;
// Unsigned 64-bit primes from xxhash64.
@@ -79,7 +76,7 @@ impl XxHash64 {
let mut idx = 0;
let buf = &self.buffer[..self.buffer_len];
while idx + 8 <= buf.len() {
- let mut k1 = LE::read_u64(&buf[idx..idx + 8]);
+ let mut k1 = super::read_u64_le(&buf[idx..idx + 8]);
k1 = k1.wrapping_mul(P2);
k1 = k1.rotate_left(31);
k1 = k1.wrapping_mul(P1);
@@ -89,7 +86,7 @@ impl XxHash64 {
}
if idx + 4 <= buf.len() {
- let k1 = LE::read_u32(&buf[idx..idx + 4]) as u64;
+ let k1 = super::read_u64_le(&buf[idx..idx + 4]);
hash ^= k1.wrapping_mul(P1);
hash = hash.rotate_left(23).wrapping_mul(P2).wrapping_add(P3);
idx += 4;
@@ -119,10 +116,10 @@ impl XxHash64 {
#[inline]
fn update(&mut self, chunk: &[u8]) {
- self.v1 = round(self.v1, LE::read_u64(&chunk[0..8]));
- self.v2 = round(self.v2, LE::read_u64(&chunk[8..16]));
- self.v3 = round(self.v3, LE::read_u64(&chunk[16..24]));
- self.v4 = round(self.v4, LE::read_u64(&chunk[24..32]));
+ self.v1 = round(self.v1, super::read_u64_le(&chunk[0..8]));
+ self.v2 = round(self.v2, super::read_u64_le(&chunk[8..16]));
+ self.v3 = round(self.v3, super::read_u64_le(&chunk[16..24]));
+ self.v4 = round(self.v4, super::read_u64_le(&chunk[24..32]));
}
}
diff --git a/datasketches/src/hll/array4.rs b/datasketches/src/hll/array4.rs
index fbef5e4..3944a41 100644
--- a/datasketches/src/hll/array4.rs
+++ b/datasketches/src/hll/array4.rs
@@ -21,6 +21,8 @@
//! When values exceed 4 bits after cur_min offset, they're stored in an
auxiliary hash map.
use super::aux_map::AuxMap;
+use crate::codec::SketchBytes;
+use crate::codec::SketchSlice;
use crate::error::Error;
use crate::hll::NumStdDev;
use crate::hll::estimator::HipEstimator;
@@ -285,67 +287,48 @@ impl Array4 {
///
/// Expects full HLL preamble (40 bytes) followed by packed 4-bit data and
optional aux map.
pub fn deserialize(
- bytes: &[u8],
+ mut cursor: SketchSlice,
+ cur_min: u8,
lg_config_k: u8,
compact: bool,
ooo: bool,
) -> Result<Self, Error> {
use crate::hll::get_slot;
use crate::hll::get_value;
- use crate::hll::serialization::*;
- if bytes.len() < HLL_PREAMBLE_SIZE {
- return Err(Error::insufficient_data(format!(
- "expected at least {}, got {}",
- HLL_PREAMBLE_SIZE,
- bytes.len()
- )));
+ fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) ->
Error {
+ move |_| Error::insufficient_data(tag)
}
let num_bytes = 1 << (lg_config_k - 1); // k/2 bytes for 4-bit packing
- // Read cur_min from header
- let cur_min = bytes[HLL_CUR_MIN_BYTE];
-
// Read HIP estimator values from preamble
- let hip_accum = read_f64_le(bytes, HIP_ACCUM_DOUBLE);
- let kxq0 = read_f64_le(bytes, KXQ0_DOUBLE);
- let kxq1 = read_f64_le(bytes, KXQ1_DOUBLE);
+ let hip_accum = cursor.read_f64_le().map_err(make_error("hip_accum"))?;
+ let kxq0 = cursor.read_f64_le().map_err(make_error("kxq0"))?;
+ let kxq1 = cursor.read_f64_le().map_err(make_error("kxq1"))?;
// Read num_at_cur_min and aux_count
- let num_at_cur_min = read_u32_le(bytes, CUR_MIN_COUNT_INT);
- let aux_count = read_u32_le(bytes, AUX_COUNT_INT);
-
- // Calculate expected length
- let expected_len = if compact {
- HLL_PREAMBLE_SIZE // Just preamble for compact empty sketch
- } else {
- HLL_PREAMBLE_SIZE + num_bytes + (aux_count as usize *
COUPON_SIZE_BYTES)
- };
+ let num_at_cur_min =
cursor.read_u32_le().map_err(make_error("num_at_cur_min"))?;
+ let aux_count = cursor.read_u32_le().map_err(make_error("aux_count"))?;
- if bytes.len() < expected_len {
- return Err(Error::insufficient_data(format!(
- "expected {}, got {}",
- expected_len,
- bytes.len()
- )));
- }
-
- // Read packed 4-bit byte array from HLL_BYTE_ARR_START
+ // Read packed 4-bit byte array
let mut data = vec![0u8; num_bytes];
if !compact {
- data.copy_from_slice(&bytes[HLL_BYTE_ARR_START..HLL_BYTE_ARR_START
+ num_bytes]);
+ cursor.read_exact(&mut data).map_err(make_error("data"))?;
+ } else {
+ cursor.advance(num_bytes as u64);
}
// Read aux map if present
let mut aux_map = None;
if aux_count > 0 {
let mut aux = AuxMap::new(lg_config_k);
- let aux_start = HLL_BYTE_ARR_START + num_bytes;
-
for i in 0..aux_count {
- let offset = aux_start + (i as usize * COUPON_SIZE_BYTES);
- let coupon = read_u32_le(bytes, offset);
+ let coupon = cursor.read_u32_le().map_err(|_| {
+ Error::insufficient_data(format!(
+ "expected {aux_count} aux coupons, failed at index
{i}",
+ ))
+ })?;
let slot = get_slot(coupon) & ((1 << lg_config_k) - 1);
let value = get_value(coupon);
aux.insert(slot, value);
@@ -388,51 +371,49 @@ impl Array4 {
let aux_count = aux_entries.len() as u32;
let total_size = HLL_PREAMBLE_SIZE + num_bytes + (aux_count as usize *
COUPON_SIZE_BYTES);
- let mut bytes = vec![0u8; total_size];
+ let mut bytes = SketchBytes::with_capacity(total_size);
// Write standard header
- bytes[PREAMBLE_INTS_BYTE] = HLL_PREINTS;
- bytes[SER_VER_BYTE] = SERIAL_VER;
- bytes[FAMILY_BYTE] = HLL_FAMILY_ID;
- bytes[LG_K_BYTE] = lg_config_k;
- bytes[LG_ARR_BYTE] = 0; // Not used for HLL mode
+ bytes.write_u8(HLL_PREINTS);
+ bytes.write_u8(SERIAL_VER);
+ bytes.write_u8(HLL_FAMILY_ID);
+ bytes.write_u8(lg_config_k);
+ bytes.write_u8(0); // unused for HLL mode
// Write flags
let mut flags = 0u8;
if self.estimator.is_out_of_order() {
flags |= OUT_OF_ORDER_FLAG_MASK;
}
- bytes[FLAGS_BYTE] = flags;
+ bytes.write_u8(flags);
// Write cur_min
- bytes[HLL_CUR_MIN_BYTE] = self.cur_min;
+ bytes.write_u8(self.cur_min);
// Mode byte: HLL mode with HLL4 type
- bytes[MODE_BYTE] = encode_mode_byte(CUR_MODE_HLL, TGT_HLL4);
+ bytes.write_u8(encode_mode_byte(CUR_MODE_HLL, TGT_HLL4));
// Write HIP estimator values
- write_f64_le(&mut bytes, HIP_ACCUM_DOUBLE, self.estimator.hip_accum());
- write_f64_le(&mut bytes, KXQ0_DOUBLE, self.estimator.kxq0());
- write_f64_le(&mut bytes, KXQ1_DOUBLE, self.estimator.kxq1());
+ bytes.write_f64_le(self.estimator.hip_accum());
+ bytes.write_f64_le(self.estimator.kxq0());
+ bytes.write_f64_le(self.estimator.kxq1());
// Write num_at_cur_min
- write_u32_le(&mut bytes, CUR_MIN_COUNT_INT, self.num_at_cur_min);
+ bytes.write_u32_le(self.num_at_cur_min);
// Write aux_count
- write_u32_le(&mut bytes, AUX_COUNT_INT, aux_count);
+ bytes.write_u32_le(aux_count);
// Write packed 4-bit byte array
- bytes[HLL_BYTE_ARR_START..HLL_BYTE_ARR_START +
num_bytes].copy_from_slice(&self.bytes);
+ bytes.write(&self.bytes);
// Write aux map entries if present
- let aux_start = HLL_BYTE_ARR_START + num_bytes;
- for (i, (slot, value)) in aux_entries.iter().enumerate() {
- let offset = aux_start + (i * COUPON_SIZE_BYTES);
- let coupon = pack_coupon(*slot, *value);
- write_u32_le(&mut bytes, offset, coupon);
+ for (slot, value) in aux_entries.iter().copied() {
+ let coupon = pack_coupon(slot, value);
+ bytes.write_u32_le(coupon);
}
- bytes
+ bytes.into_bytes()
}
}
diff --git a/datasketches/src/hll/array6.rs b/datasketches/src/hll/array6.rs
index 5d36cb5..f247a67 100644
--- a/datasketches/src/hll/array6.rs
+++ b/datasketches/src/hll/array6.rs
@@ -21,6 +21,8 @@
//! This is sufficient for most HLL use cases without needing exception
handling or
//! cur_min optimization like Array4.
+use crate::codec::SketchBytes;
+use crate::codec::SketchSlice;
use crate::error::Error;
use crate::hll::NumStdDev;
use crate::hll::estimator::HipEstimator;
@@ -169,41 +171,33 @@ impl Array6 {
///
/// Expects full HLL preamble (40 bytes) followed by packed 6-bit data.
pub fn deserialize(
- bytes: &[u8],
+ mut cursor: SketchSlice,
lg_config_k: u8,
compact: bool,
ooo: bool,
) -> Result<Self, Error> {
- use crate::hll::serialization::*;
+ fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) ->
Error {
+ move |_| Error::insufficient_data(tag)
+ }
let k = 1 << lg_config_k;
let num_bytes = num_bytes_for_k(k);
- let expected_len = if compact {
- HLL_PREAMBLE_SIZE // Just preamble for compact empty sketch
- } else {
- HLL_PREAMBLE_SIZE + num_bytes
- };
-
- if bytes.len() < expected_len {
- return Err(Error::insufficient_data(format!(
- "expected {}, got {}",
- expected_len,
- bytes.len()
- )));
- }
// Read HIP estimator values from preamble
- let hip_accum = read_f64_le(bytes, HIP_ACCUM_DOUBLE);
- let kxq0 = read_f64_le(bytes, KXQ0_DOUBLE);
- let kxq1 = read_f64_le(bytes, KXQ1_DOUBLE);
+ let hip_accum = cursor.read_f64_le().map_err(make_error("hip_accum"))?;
+ let kxq0 = cursor.read_f64_le().map_err(make_error("kxq0"))?;
+ let kxq1 = cursor.read_f64_le().map_err(make_error("kxq1"))?;
// Read num_at_cur_min (for Array6, this is num_zeros since cur_min=0)
- let num_zeros = read_u32_le(bytes, CUR_MIN_COUNT_INT);
+ let num_zeros = cursor.read_u32_le().map_err(make_error("num_zeros"))?;
+ let _aux_count =
cursor.read_u32_le().map_err(make_error("aux_count"))?; // always 0
// Read packed byte array from offset HLL_BYTE_ARR_START
let mut data = vec![0u8; num_bytes];
if !compact {
- data.copy_from_slice(&bytes[HLL_BYTE_ARR_START..HLL_BYTE_ARR_START
+ num_bytes]);
+ cursor.read_exact(&mut data).map_err(make_error("data"))?;
+ } else {
+ cursor.advance(num_bytes as u64);
}
// Create estimator and restore state
@@ -230,43 +224,43 @@ impl Array6 {
let k = 1 << lg_config_k;
let num_bytes = num_bytes_for_k(k);
let total_size = HLL_PREAMBLE_SIZE + num_bytes;
- let mut bytes = vec![0u8; total_size];
+ let mut bytes = SketchBytes::with_capacity(total_size);
// Write standard header
- bytes[PREAMBLE_INTS_BYTE] = HLL_PREINTS;
- bytes[SER_VER_BYTE] = SERIAL_VER;
- bytes[FAMILY_BYTE] = HLL_FAMILY_ID;
- bytes[LG_K_BYTE] = lg_config_k;
- bytes[LG_ARR_BYTE] = 0; // Not used for HLL mode
+ bytes.write_u8(HLL_PREINTS);
+ bytes.write_u8(SERIAL_VER);
+ bytes.write_u8(HLL_FAMILY_ID);
+ bytes.write_u8(lg_config_k);
+ bytes.write_u8(0); // unused for HLL mode
// Write flags
let mut flags = 0u8;
if self.estimator.is_out_of_order() {
flags |= OUT_OF_ORDER_FLAG_MASK;
}
- bytes[FLAGS_BYTE] = flags;
+ bytes.write_u8(flags);
// cur_min is always 0 for Array6
- bytes[HLL_CUR_MIN_BYTE] = 0;
+ bytes.write_u8(0);
// Mode byte: HLL mode with HLL6 type
- bytes[MODE_BYTE] = encode_mode_byte(CUR_MODE_HLL, TGT_HLL6);
+ bytes.write_u8(encode_mode_byte(CUR_MODE_HLL, TGT_HLL6));
// Write HIP estimator values
- write_f64_le(&mut bytes, HIP_ACCUM_DOUBLE, self.estimator.hip_accum());
- write_f64_le(&mut bytes, KXQ0_DOUBLE, self.estimator.kxq0());
- write_f64_le(&mut bytes, KXQ1_DOUBLE, self.estimator.kxq1());
+ bytes.write_f64_le(self.estimator.hip_accum());
+ bytes.write_f64_le(self.estimator.kxq0());
+ bytes.write_f64_le(self.estimator.kxq1());
// Write num_at_cur_min (num_zeros for Array6)
- write_u32_le(&mut bytes, CUR_MIN_COUNT_INT, self.num_zeros);
+ bytes.write_u32_le(self.num_zeros);
// Write aux_count (always 0 for Array6)
- write_u32_le(&mut bytes, AUX_COUNT_INT, 0);
+ bytes.write_u32_le(0);
// Write packed byte array
- bytes[HLL_BYTE_ARR_START..].copy_from_slice(&self.bytes);
+ bytes.write(&self.bytes);
- bytes
+ bytes.into_bytes()
}
}
diff --git a/datasketches/src/hll/array8.rs b/datasketches/src/hll/array8.rs
index dd3b556..3ac1f0c 100644
--- a/datasketches/src/hll/array8.rs
+++ b/datasketches/src/hll/array8.rs
@@ -20,6 +20,8 @@
//! Array8 is the simplest HLL array implementation, storing one byte per slot.
//! This provides the maximum value range (0-255) with no bit-packing
complexity.
+use crate::codec::SketchBytes;
+use crate::codec::SketchSlice;
use crate::error::Error;
use crate::hll::NumStdDev;
use crate::hll::estimator::HipEstimator;
@@ -243,40 +245,32 @@ impl Array8 {
///
/// Expects full HLL preamble (40 bytes) followed by k bytes of data.
pub fn deserialize(
- bytes: &[u8],
+ mut cursor: SketchSlice,
lg_config_k: u8,
compact: bool,
ooo: bool,
) -> Result<Self, Error> {
- use crate::hll::serialization::*;
-
- let k = 1 << lg_config_k;
- let expected_len = if compact {
- HLL_PREAMBLE_SIZE // Just preamble for compact empty sketch
- } else {
- HLL_PREAMBLE_SIZE + k as usize
- };
-
- if bytes.len() < expected_len {
- return Err(Error::insufficient_data(format!(
- "expected {}, got {}",
- expected_len,
- bytes.len()
- )));
+ fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) ->
Error {
+ move |_| Error::insufficient_data(tag)
}
+ let k = 1usize << lg_config_k;
+
// Read HIP estimator values from preamble
- let hip_accum = read_f64_le(bytes, HIP_ACCUM_DOUBLE);
- let kxq0 = read_f64_le(bytes, KXQ0_DOUBLE);
- let kxq1 = read_f64_le(bytes, KXQ1_DOUBLE);
+ let hip_accum = cursor.read_f64_le().map_err(make_error("hip_accum"))?;
+ let kxq0 = cursor.read_f64_le().map_err(make_error("kxq0"))?;
+ let kxq1 = cursor.read_f64_le().map_err(make_error("kxq1"))?;
// Read num_at_cur_min (for Array8, this is num_zeros since cur_min=0)
- let num_zeros = read_u32_le(bytes, CUR_MIN_COUNT_INT);
+ let num_zeros = cursor.read_u32_le().map_err(make_error("num_zeros"))?;
+ let _aux_count =
cursor.read_u32_le().map_err(make_error("aux_count"))?; // always 0
// Read byte array from offset HLL_BYTE_ARR_START
- let mut data = vec![0u8; k as usize];
+ let mut data = vec![0u8; k];
if !compact {
- data.copy_from_slice(&bytes[HLL_BYTE_ARR_START..HLL_BYTE_ARR_START
+ k as usize]);
+ cursor.read_exact(&mut data).map_err(make_error("data"))?;
+ } else {
+ cursor.advance(k as u64);
}
// Create estimator and restore state
@@ -302,43 +296,43 @@ impl Array8 {
let k = 1 << lg_config_k;
let total_size = HLL_PREAMBLE_SIZE + k as usize;
- let mut bytes = vec![0u8; total_size];
+ let mut bytes = SketchBytes::with_capacity(total_size);
// Write standard header
- bytes[PREAMBLE_INTS_BYTE] = HLL_PREINTS;
- bytes[SER_VER_BYTE] = SERIAL_VER;
- bytes[FAMILY_BYTE] = HLL_FAMILY_ID;
- bytes[LG_K_BYTE] = lg_config_k;
- bytes[LG_ARR_BYTE] = 0; // Not used for HLL mode
+ bytes.write_u8(HLL_PREINTS);
+ bytes.write_u8(SERIAL_VER);
+ bytes.write_u8(HLL_FAMILY_ID);
+ bytes.write_u8(lg_config_k);
+ bytes.write_u8(0); // unused for HLL mode
// Write flags
let mut flags = 0u8;
if self.estimator.is_out_of_order() {
flags |= OUT_OF_ORDER_FLAG_MASK;
}
- bytes[FLAGS_BYTE] = flags;
+ bytes.write_u8(flags);
// cur_min is always 0 for Array8
- bytes[HLL_CUR_MIN_BYTE] = 0;
+ bytes.write_u8(0);
// Mode byte: HLL mode with HLL8 type
- bytes[MODE_BYTE] = encode_mode_byte(CUR_MODE_HLL, TGT_HLL8);
+ bytes.write_u8(encode_mode_byte(CUR_MODE_HLL, TGT_HLL8));
// Write HIP estimator values
- write_f64_le(&mut bytes, HIP_ACCUM_DOUBLE, self.estimator.hip_accum());
- write_f64_le(&mut bytes, KXQ0_DOUBLE, self.estimator.kxq0());
- write_f64_le(&mut bytes, KXQ1_DOUBLE, self.estimator.kxq1());
+ bytes.write_f64_le(self.estimator.hip_accum());
+ bytes.write_f64_le(self.estimator.kxq0());
+ bytes.write_f64_le(self.estimator.kxq1());
// Write num_at_cur_min (num_zeros for Array8)
- write_u32_le(&mut bytes, CUR_MIN_COUNT_INT, self.num_zeros);
+ bytes.write_u32_le(self.num_zeros);
// Write aux_count (always 0 for Array8)
- write_u32_le(&mut bytes, AUX_COUNT_INT, 0);
+ bytes.write_u32_le(0);
// Write byte array
- bytes[HLL_BYTE_ARR_START..].copy_from_slice(&self.bytes);
+ bytes.write(&self.bytes);
- bytes
+ bytes.into_bytes()
}
}
diff --git a/datasketches/src/hll/hash_set.rs b/datasketches/src/hll/hash_set.rs
index f6bff05..1a31031 100644
--- a/datasketches/src/hll/hash_set.rs
+++ b/datasketches/src/hll/hash_set.rs
@@ -20,6 +20,8 @@
//! Uses open addressing with a custom stride function to handle collisions.
//! Provides better performance than List when many coupons are stored.
+use crate::codec::SketchBytes;
+use crate::codec::SketchSlice;
use crate::error::Error;
use crate::hll::HllType;
use crate::hll::KEY_MASK_26;
@@ -84,49 +86,42 @@ impl HashSet {
}
/// Deserialize a HashSet from bytes
- pub fn deserialize(bytes: &[u8], compact: bool) -> Result<Self, Error> {
+ pub fn deserialize(
+ mut cursor: SketchSlice,
+ lg_arr: usize,
+ compact: bool,
+ ) -> Result<Self, Error> {
// Read coupon count from bytes 8-11
- let coupon_count = read_u32_le(bytes, HASH_SET_COUNT_INT) as usize;
-
- // Compute array size
- let lg_arr = bytes[LG_ARR_BYTE] as usize;
+ let coupon_count = cursor
+ .read_u32_le()
+ .map_err(|_| Error::insufficient_data("coupon_count"))?;
+ let coupon_count = coupon_count as usize;
if compact {
// Compact mode: only couponCount coupons are stored
- let expected_len = HASH_SET_INT_ARR_START + (coupon_count * 4);
- if bytes.len() < expected_len {
- return Err(Error::insufficient_data(format!(
- "expected {}, got {}",
- expected_len,
- bytes.len()
- )));
- }
-
// Create a new hash set and insert coupons one by one
let mut hash_set = HashSet::new(lg_arr);
for i in 0..coupon_count {
- let offset = HASH_SET_INT_ARR_START + i * COUPON_SIZE_BYTES;
- let coupon = read_u32_le(bytes, offset);
+ let coupon = cursor.read_u32_le().map_err(|_| {
+ Error::insufficient_data(format!(
+ "expected {coupon_count} coupons, failed at index {i}"
+ ))
+ })?;
hash_set.update(coupon);
}
Ok(hash_set)
} else {
// Non-compact mode: full hash table with empty slots
let array_size = 1 << lg_arr;
- let expected_len = HASH_SET_INT_ARR_START + (array_size * 4);
- if bytes.len() < expected_len {
- return Err(Error::insufficient_data(format!(
- "expected {}, got {}",
- expected_len,
- bytes.len()
- )));
- }
// Read entire hash table including empty slots
let mut coupons = vec![0u32; array_size];
for (i, coupon) in coupons.iter_mut().enumerate() {
- let offset = HASH_SET_INT_ARR_START + i * COUPON_SIZE_BYTES;
- *coupon = read_u32_le(bytes, offset);
+ *coupon = cursor.read_u32_le().map_err(|_| {
+ Error::insufficient_data(format!(
+ "expected {array_size} coupons, failed at index {i}"
+ ))
+ })?;
}
Ok(Self {
@@ -147,29 +142,32 @@ impl HashSet {
// Compute size
let array_size = if compact { coupon_count } else { 1 << lg_arr };
- let total_size = HASH_SET_INT_ARR_START + (array_size * 4);
+ let total_size = SET_PREAMBLE_SIZE + (array_size * 4);
- let mut bytes = vec![0u8; total_size];
+ let mut bytes = SketchBytes::with_capacity(total_size);
// Write preamble
- bytes[PREAMBLE_INTS_BYTE] = HASH_SET_PREINTS;
- bytes[SER_VER_BYTE] = SERIAL_VER;
- bytes[FAMILY_BYTE] = HLL_FAMILY_ID;
- bytes[LG_K_BYTE] = lg_config_k;
- bytes[LG_ARR_BYTE] = lg_arr as u8;
+ bytes.write_u8(HASH_SET_PREINTS);
+ bytes.write_u8(SERIAL_VER);
+ bytes.write_u8(HLL_FAMILY_ID);
+ bytes.write_u8(lg_config_k);
+ bytes.write_u8(lg_arr as u8);
// Write flags
let mut flags = 0u8;
if compact {
flags |= COMPACT_FLAG_MASK;
}
- bytes[FLAGS_BYTE] = flags;
+ bytes.write_u8(flags);
+
+ // Write unused byte
+ bytes.write_u8(0);
// Write mode byte: SET mode with target HLL type
- bytes[MODE_BYTE] = encode_mode_byte(CUR_MODE_SET, hll_type as u8);
+ bytes.write_u8(encode_mode_byte(CUR_MODE_SET, hll_type as u8));
// Write coupon count
- write_u32_le(&mut bytes, HASH_SET_COUNT_INT, coupon_count as u32);
+ bytes.write_u32_le(coupon_count as u32);
// Write coupons
if compact {
@@ -183,18 +181,16 @@ impl HashSet {
.collect();
coupons_vec.sort_unstable();
- for (i, coupon) in coupons_vec.iter().enumerate() {
- let offset = HASH_SET_INT_ARR_START + i * 4;
- bytes[offset..offset +
4].copy_from_slice(&coupon.to_le_bytes());
+ for coupon in coupons_vec.iter().copied() {
+ bytes.write_u32_le(coupon);
}
} else {
// Non-compact mode: write entire hash table
- for (i, coupon) in self.container.coupons.iter().enumerate() {
- let offset = HASH_SET_INT_ARR_START + i * 4;
- bytes[offset..offset +
4].copy_from_slice(&coupon.to_le_bytes());
+ for coupon in self.container.coupons.iter().copied() {
+ bytes.write_u32_le(coupon);
}
}
- bytes
+ bytes.into_bytes()
}
}
diff --git a/datasketches/src/hll/list.rs b/datasketches/src/hll/list.rs
index c705383..2fa9173 100644
--- a/datasketches/src/hll/list.rs
+++ b/datasketches/src/hll/list.rs
@@ -20,6 +20,8 @@
//! Provides sequential storage with linear search for duplicates.
//! Efficient for small numbers of coupons before transitioning to HashSet.
+use crate::codec::SketchBytes;
+use crate::codec::SketchSlice;
use crate::error::Error;
use crate::hll::HllType;
use crate::hll::container::COUPON_EMPTY;
@@ -66,30 +68,25 @@ impl List {
}
/// Deserialize a List from bytes
- pub fn deserialize(bytes: &[u8], empty: bool, compact: bool) ->
Result<Self, Error> {
- // Read coupon count from byte 6
- let coupon_count = bytes[LIST_COUNT_BYTE] as usize;
-
+ pub fn deserialize(
+ mut cursor: SketchSlice,
+ lg_arr: usize,
+ coupon_count: usize,
+ empty: bool,
+ compact: bool,
+ ) -> Result<Self, Error> {
// Compute array size
- let lg_arr = bytes[LG_ARR_BYTE] as usize;
let array_size = if compact { coupon_count } else { 1 << lg_arr };
- // Validate length
- let expected_len = LIST_INT_ARR_START + (array_size * 4);
- if bytes.len() < expected_len {
- return Err(Error::insufficient_data(format!(
- "expected {}, got {}",
- expected_len,
- bytes.len()
- )));
- }
-
// Read coupons
let mut coupons = vec![0u32; array_size];
if !empty && coupon_count > 0 {
for (i, coupon) in coupons.iter_mut().enumerate() {
- let offset = LIST_INT_ARR_START + i * COUPON_SIZE_BYTES;
- *coupon = read_u32_le(bytes, offset);
+ *coupon = cursor.read_u32_le().map_err(|_| {
+ Error::insufficient_data(format!(
+ "expect {coupon_count} coupons, failed at index {i}"
+ ))
+ })?;
}
}
@@ -107,16 +104,16 @@ impl List {
// Compute size
let array_size = if compact { coupon_count } else { 1 << lg_arr };
- let total_size = LIST_INT_ARR_START + (array_size * 4);
+ let total_size = LIST_PREAMBLE_SIZE + (array_size * 4);
- let mut bytes = vec![0u8; total_size];
+ let mut bytes = SketchBytes::with_capacity(total_size);
// Write preamble
- bytes[PREAMBLE_INTS_BYTE] = LIST_PREINTS;
- bytes[SER_VER_BYTE] = SERIAL_VER;
- bytes[FAMILY_BYTE] = HLL_FAMILY_ID;
- bytes[LG_K_BYTE] = lg_config_k;
- bytes[LG_ARR_BYTE] = lg_arr as u8;
+ bytes.write_u8(LIST_PREINTS);
+ bytes.write_u8(SERIAL_VER);
+ bytes.write_u8(HLL_FAMILY_ID);
+ bytes.write_u8(lg_config_k);
+ bytes.write_u8(lg_arr as u8);
// Write flags
let mut flags = 0u8;
@@ -126,23 +123,22 @@ impl List {
if compact {
flags |= COMPACT_FLAG_MASK;
}
- bytes[FLAGS_BYTE] = flags;
+ bytes.write_u8(flags);
// Write count
- bytes[LIST_COUNT_BYTE] = coupon_count as u8;
+ bytes.write_u8(coupon_count as u8);
// Write mode byte: LIST mode with target HLL type
- bytes[MODE_BYTE] = encode_mode_byte(CUR_MODE_LIST, hll_type as u8);
+ bytes.write_u8(encode_mode_byte(CUR_MODE_LIST, hll_type as u8));
// Write coupons (only non-empty ones if compact)
if !empty {
let mut write_idx = 0;
- for coupon in &self.container.coupons {
- if compact && *coupon == 0 {
+ for coupon in self.container.coupons.iter().copied() {
+ if compact && coupon == 0 {
continue; // Skip empty coupons in compact mode
}
- let offset = LIST_INT_ARR_START + write_idx * 4;
- write_u32_le(&mut bytes, offset, *coupon);
+ bytes.write_u32_le(coupon);
write_idx += 1;
if write_idx >= array_size {
break;
@@ -150,6 +146,6 @@ impl List {
}
}
- bytes
+ bytes.into_bytes()
}
}
diff --git a/datasketches/src/hll/serialization.rs
b/datasketches/src/hll/serialization.rs
index b99a262..30f034f 100644
--- a/datasketches/src/hll/serialization.rs
+++ b/datasketches/src/hll/serialization.rs
@@ -28,76 +28,22 @@ pub const SERIAL_VER: u8 = 1;
/// Flag indicating sketch is empty (no values inserted)
pub const EMPTY_FLAG_MASK: u8 = 4;
-
/// Flag indicating compact serialization (no empty slots stored)
pub const COMPACT_FLAG_MASK: u8 = 8;
-
/// Flag indicating out-of-order mode (HIP estimator invalid)
pub const OUT_OF_ORDER_FLAG_MASK: u8 = 16;
-/// Offset of preamble size field (in 4-byte ints)
-pub const PREAMBLE_INTS_BYTE: usize = 0;
-
-/// Offset of serialization version byte
-pub const SER_VER_BYTE: usize = 1;
-
-/// Offset of family ID byte
-pub const FAMILY_BYTE: usize = 2;
-
-/// Offset of lg_config_k byte
-pub const LG_K_BYTE: usize = 3;
-
-/// Offset of lg_arr (array size) byte
-pub const LG_ARR_BYTE: usize = 4;
-
-/// Offset of flags byte
-pub const FLAGS_BYTE: usize = 5;
-
-/// Offset of mode byte (current mode in low 2 bits, target type in bits 2-3)
-pub const MODE_BYTE: usize = 7;
-
/// Preamble size for LIST mode (8 bytes = 2 ints)
pub const LIST_PREINTS: u8 = 2;
-
/// Preamble size for SET mode (12 bytes = 3 ints)
pub const HASH_SET_PREINTS: u8 = 3;
-
/// Preamble size for HLL mode (40 bytes = 10 ints)
pub const HLL_PREINTS: u8 = 10;
-/// Offset of coupon count byte in LIST mode
-pub const LIST_COUNT_BYTE: usize = 6;
-
-/// Offset where coupon array starts in LIST mode
-pub const LIST_INT_ARR_START: usize = 8;
-
-/// Offset of coupon count in SET mode (4-byte int)
-pub const HASH_SET_COUNT_INT: usize = 8;
-
-/// Offset where coupon array starts in SET mode
-pub const HASH_SET_INT_ARR_START: usize = 12;
-
-/// Offset of cur_min byte in HLL mode header
-pub const HLL_CUR_MIN_BYTE: usize = 6;
-
-/// Offset of HIP accumulator (8-byte double) in HLL preamble
-pub const HIP_ACCUM_DOUBLE: usize = 8;
-
-/// Offset of KxQ0 register (8-byte double) in HLL preamble
-pub const KXQ0_DOUBLE: usize = 16;
-
-/// Offset of KxQ1 register (8-byte double) in HLL preamble
-pub const KXQ1_DOUBLE: usize = 24;
-
-/// Offset of num_at_cur_min (4-byte int) in HLL preamble
-pub const CUR_MIN_COUNT_INT: usize = 32;
-
-/// Offset of aux_count (4-byte int) in HLL preamble
-pub const AUX_COUNT_INT: usize = 36;
-
-/// Offset where HLL byte array data starts
-pub const HLL_BYTE_ARR_START: usize = 40;
-
+/// Total size of LIST preamble in bytes
+pub const LIST_PREAMBLE_SIZE: usize = 8;
+/// Total size of SET preamble in bytes
+pub const SET_PREAMBLE_SIZE: usize = 12;
/// Total size of HLL preamble in bytes
pub const HLL_PREAMBLE_SIZE: usize = 40;
@@ -148,47 +94,3 @@ pub const TGT_HLL8: u8 = 2;
/// Size of a single coupon in bytes (u32)
pub const COUPON_SIZE_BYTES: usize = 4;
-
-/// Size of a double (f64) in bytes
-pub const DOUBLE_SIZE_BYTES: usize = 8;
-
-/// Size of an int (u32) in bytes
-pub const INT_SIZE_BYTES: usize = 4;
-
-/// Read an u32 value from bytes at the given offset (little-endian)
-#[inline]
-pub fn read_u32_le(bytes: &[u8], offset: usize) -> u32 {
- u32::from_le_bytes([
- bytes[offset],
- bytes[offset + 1],
- bytes[offset + 2],
- bytes[offset + 3],
- ])
-}
-
-/// Read a f64 value from bytes at the given offset (little-endian)
-#[inline]
-pub fn read_f64_le(bytes: &[u8], offset: usize) -> f64 {
- f64::from_le_bytes([
- bytes[offset],
- bytes[offset + 1],
- bytes[offset + 2],
- bytes[offset + 3],
- bytes[offset + 4],
- bytes[offset + 5],
- bytes[offset + 6],
- bytes[offset + 7],
- ])
-}
-
-/// Write an u32 value to bytes at the given offset (little-endian)
-#[inline]
-pub fn write_u32_le(bytes: &mut [u8], offset: usize, value: u32) {
- bytes[offset..offset +
INT_SIZE_BYTES].copy_from_slice(&value.to_le_bytes());
-}
-
-/// Write a f64 value to bytes at the given offset (little-endian)
-#[inline]
-pub fn write_f64_le(bytes: &mut [u8], offset: usize, value: f64) {
- bytes[offset..offset +
DOUBLE_SIZE_BYTES].copy_from_slice(&value.to_le_bytes());
-}
diff --git a/datasketches/src/hll/sketch.rs b/datasketches/src/hll/sketch.rs
index a5f0aac..ef76647 100644
--- a/datasketches/src/hll/sketch.rs
+++ b/datasketches/src/hll/sketch.rs
@@ -22,6 +22,7 @@
use std::hash::Hash;
+use crate::codec::SketchSlice;
use crate::error::Error;
use crate::hll::HllType;
use crate::hll::NumStdDev;
@@ -213,19 +214,26 @@ impl HllSketch {
/// Deserializes an HLL sketch from bytes
pub fn deserialize(bytes: &[u8]) -> Result<HllSketch, Error> {
- if bytes.len() < 8 {
- return Err(Error::insufficient_data(
- "sketch data too short (< 8 bytes)",
- ));
+ fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) ->
Error {
+ move |_| Error::insufficient_data(tag)
}
+ let mut cursor = SketchSlice::new(bytes);
+
// Read and validate preamble
- let preamble_ints = bytes[PREAMBLE_INTS_BYTE];
- let serial_ver = bytes[SER_VER_BYTE];
- let family_id = bytes[FAMILY_BYTE];
- let lg_config_k = bytes[LG_K_BYTE];
- let flags = bytes[FLAGS_BYTE];
- let mode_byte = bytes[MODE_BYTE];
+ let preamble_ints =
cursor.read_u8().map_err(make_error("preamble_ints"))?;
+ let serial_version =
cursor.read_u8().map_err(make_error("serial_version"))?;
+ let family_id = cursor.read_u8().map_err(make_error("family_id"))?;
+ let lg_config_k = cursor.read_u8().map_err(make_error("lg_config_k"))?;
+ // lg_arr used in List/Set modes
+ let lg_arr = cursor.read_u8().map_err(make_error("lg_arr"))?;
+ let flags = cursor.read_u8().map_err(make_error("flags"))?;
+ // The contextual state byte:
+ // * coupon count in LIST mode
+ // * cur_min in HLL mode
+ // * unused in SET mode
+ let state = cursor.read_u8().map_err(make_error("state"))?;
+ let mode_byte = cursor.read_u8().map_err(make_error("mode"))?;
// Verify family ID
if family_id != HLL_FAMILY_ID {
@@ -233,8 +241,11 @@ impl HllSketch {
}
// Verify serialization version
- if serial_ver != SERIAL_VER {
- return Err(Error::unsupported_serial_version(SERIAL_VER,
serial_ver));
+ if serial_version != SERIAL_VER {
+ return Err(Error::unsupported_serial_version(
+ SERIAL_VER,
+ serial_version,
+ ));
}
// Verify lg_k range (4-21 are valid)
@@ -268,7 +279,9 @@ impl HllSketch {
)));
}
- let list = List::deserialize(bytes, empty, compact)?;
+ let lg_arr = lg_arr as usize;
+ let coupon_count = state as usize;
+ let list = List::deserialize(cursor, lg_arr, coupon_count,
empty, compact)?;
Mode::List { list, hll_type }
}
CUR_MODE_SET => {
@@ -279,7 +292,8 @@ impl HllSketch {
)));
}
- let set = HashSet::deserialize(bytes, compact)?;
+ let lg_arr = lg_arr as usize;
+ let set = HashSet::deserialize(cursor, lg_arr, compact)?;
Mode::Set { set, hll_type }
}
CUR_MODE_HLL => {
@@ -291,11 +305,14 @@ impl HllSketch {
}
match hll_type {
- HllType::Hll4 => Array4::deserialize(bytes,
lg_config_k, compact, ooo)
- .map(Mode::Array4)?,
- HllType::Hll6 => Array6::deserialize(bytes,
lg_config_k, compact, ooo)
+ HllType::Hll4 => {
+ let cur_min = state;
+ Array4::deserialize(cursor, cur_min, lg_config_k,
compact, ooo)
+ .map(Mode::Array4)?
+ }
+ HllType::Hll6 => Array6::deserialize(cursor,
lg_config_k, compact, ooo)
.map(Mode::Array6)?,
- HllType::Hll8 => Array8::deserialize(bytes,
lg_config_k, compact, ooo)
+ HllType::Hll8 => Array8::deserialize(cursor,
lg_config_k, compact, ooo)
.map(Mode::Array8)?,
}
}
diff --git a/datasketches/src/lib.rs b/datasketches/src/lib.rs
index fc4679e..f436565 100644
--- a/datasketches/src/lib.rs
+++ b/datasketches/src/lib.rs
@@ -36,4 +36,5 @@ pub mod hll;
pub mod tdigest;
pub mod theta;
+mod codec;
mod hash;
diff --git a/datasketches/src/tdigest/sketch.rs
b/datasketches/src/tdigest/sketch.rs
index a0f3883..c9ec382 100644
--- a/datasketches/src/tdigest/sketch.rs
+++ b/datasketches/src/tdigest/sketch.rs
@@ -17,13 +17,9 @@
use std::cmp::Ordering;
use std::convert::identity;
-use std::io::Cursor;
use std::num::NonZeroU64;
-use byteorder::BE;
-use byteorder::LE;
-use byteorder::ReadBytesExt;
-
+use crate::codec::SketchSlice;
use crate::error::Error;
use crate::error::ErrorKind;
use crate::tdigest::serialization::*;
@@ -375,7 +371,7 @@ impl TDigestMut {
move |_| Error::insufficient_data(tag)
}
- let mut cursor = Cursor::new(bytes);
+ let mut cursor = SketchSlice::new(bytes);
let preamble_longs =
cursor.read_u8().map_err(make_error("preamble_longs"))?;
let serial_version =
cursor.read_u8().map_err(make_error("serial_version"))?;
@@ -397,7 +393,7 @@ impl TDigestMut {
serial_version,
));
}
- let k = cursor.read_u16::<LE>().map_err(make_error("k"))?;
+ let k = cursor.read_u16_le().map_err(make_error("k"))?;
if k < 10 {
return Err(Error::deserial(format!("k must be at least 10, got
{k}")));
}
@@ -415,7 +411,7 @@ impl TDigestMut {
preamble_longs,
));
}
- cursor.read_u16::<LE>().map_err(make_error("<unused>"))?; // unused
+ cursor.read_u16_le().map_err(make_error("<unused>"))?; // unused
if is_empty {
return Ok(TDigestMut::new(k));
}
@@ -423,13 +419,9 @@ impl TDigestMut {
let reverse_merge = (flags & FLAGS_REVERSE_MERGE) != 0;
if is_single_value {
let value = if is_f32 {
- cursor
- .read_f32::<LE>()
- .map_err(make_error("single_value"))? as f64
+ cursor.read_f32_le().map_err(make_error("single_value"))? as
f64
} else {
- cursor
- .read_f64::<LE>()
- .map_err(make_error("single_value"))?
+ cursor.read_f64_le().map_err(make_error("single_value"))?
};
check_non_nan(value, "single_value")?;
check_finite(value, "single_value")?;
@@ -446,21 +438,17 @@ impl TDigestMut {
vec![],
));
}
- let num_centroids = cursor
- .read_u32::<LE>()
- .map_err(make_error("num_centroids"))? as usize;
- let num_buffered = cursor
- .read_u32::<LE>()
- .map_err(make_error("num_buffered"))? as usize;
+ let num_centroids =
cursor.read_u32_le().map_err(make_error("num_centroids"))? as usize;
+ let num_buffered =
cursor.read_u32_le().map_err(make_error("num_buffered"))? as usize;
let (min, max) = if is_f32 {
(
- cursor.read_f32::<LE>().map_err(make_error("min"))? as f64,
- cursor.read_f32::<LE>().map_err(make_error("max"))? as f64,
+ cursor.read_f32_le().map_err(make_error("min"))? as f64,
+ cursor.read_f32_le().map_err(make_error("max"))? as f64,
)
} else {
(
- cursor.read_f64::<LE>().map_err(make_error("min"))?,
- cursor.read_f64::<LE>().map_err(make_error("max"))?,
+ cursor.read_f64_le().map_err(make_error("min"))?,
+ cursor.read_f64_le().map_err(make_error("max"))?,
)
};
check_non_nan(min, "min")?;
@@ -470,13 +458,13 @@ impl TDigestMut {
for _ in 0..num_centroids {
let (mean, weight) = if is_f32 {
(
- cursor.read_f32::<LE>().map_err(make_error("mean"))? as
f64,
- cursor.read_u32::<LE>().map_err(make_error("weight"))? as
u64,
+ cursor.read_f32_le().map_err(make_error("mean"))? as f64,
+ cursor.read_u32_le().map_err(make_error("weight"))? as u64,
)
} else {
(
- cursor.read_f64::<LE>().map_err(make_error("mean"))?,
- cursor.read_u64::<LE>().map_err(make_error("weight"))?,
+ cursor.read_f64_le().map_err(make_error("mean"))?,
+ cursor.read_u64_le().map_err(make_error("weight"))?,
)
};
check_non_nan(mean, "centroid mean")?;
@@ -488,13 +476,9 @@ impl TDigestMut {
let mut buffer = Vec::with_capacity(num_buffered);
for _ in 0..num_buffered {
let value = if is_f32 {
- cursor
- .read_f32::<LE>()
- .map_err(make_error("buffered_value"))? as f64
+ cursor.read_f32_le().map_err(make_error("buffered_value"))? as
f64
} else {
- cursor
- .read_f64::<LE>()
- .map_err(make_error("buffered_value"))?
+ cursor.read_f64_le().map_err(make_error("buffered_value"))?
};
check_non_nan(value, "buffered_value mean")?;
check_finite(value, "buffered_value mean")?;
@@ -518,34 +502,32 @@ impl TDigestMut {
move |_| Error::insufficient_data_of("compat format", tag)
}
- let mut cursor = Cursor::new(bytes);
+ let mut cursor = SketchSlice::new(bytes);
- let ty = cursor.read_u32::<BE>().map_err(make_error("type"))?;
+ let ty = cursor.read_u32_be().map_err(make_error("type"))?;
match ty {
COMPAT_DOUBLE => {
fn make_error(tag: &'static str) -> impl
FnOnce(std::io::Error) -> Error {
move |_| Error::insufficient_data_of("compat double
format", tag)
}
// compatibility with asBytes()
- let min = cursor.read_f64::<BE>().map_err(make_error("min"))?;
- let max = cursor.read_f64::<BE>().map_err(make_error("max"))?;
+ let min = cursor.read_f64_be().map_err(make_error("min"))?;
+ let max = cursor.read_f64_be().map_err(make_error("max"))?;
check_non_nan(min, "min in compat double format")?;
check_non_nan(max, "max in compat double format")?;
- let k = cursor.read_f64::<BE>().map_err(make_error("k"))? as
u16;
+ let k = cursor.read_f64_be().map_err(make_error("k"))? as u16;
if k < 10 {
return Err(Error::deserial(format!(
"k must be at least 10 in compat double format, got
{k}"
)));
}
- let num_centroids = cursor
- .read_u32::<BE>()
- .map_err(make_error("num_centroids"))?
- as usize;
+ let num_centroids =
+ cursor.read_u32_be().map_err(make_error("num_centroids"))?
as usize;
let mut total_weight = 0u64;
let mut centroids = Vec::with_capacity(num_centroids);
for _ in 0..num_centroids {
- let weight =
cursor.read_f64::<BE>().map_err(make_error("weight"))? as u64;
- let mean =
cursor.read_f64::<BE>().map_err(make_error("mean"))?;
+ let weight =
cursor.read_f64_be().map_err(make_error("weight"))? as u64;
+ let mean =
cursor.read_f64_be().map_err(make_error("mean"))?;
let weight = check_nonzero(weight, "centroid weight in
compat double format")?;
check_non_nan(mean, "centroid mean in compat double
format")?;
check_finite(mean, "centroid mean in compat double
format")?;
@@ -568,11 +550,11 @@ impl TDigestMut {
}
// COMPAT_FLOAT: compatibility with asSmallBytes()
// reference implementation uses doubles for min and max
- let min = cursor.read_f64::<BE>().map_err(make_error("min"))?;
- let max = cursor.read_f64::<BE>().map_err(make_error("max"))?;
+ let min = cursor.read_f64_be().map_err(make_error("min"))?;
+ let max = cursor.read_f64_be().map_err(make_error("max"))?;
check_non_nan(min, "min in compat float format")?;
check_non_nan(max, "max in compat float format")?;
- let k = cursor.read_f32::<BE>().map_err(make_error("k"))? as
u16;
+ let k = cursor.read_f32_be().map_err(make_error("k"))? as u16;
if k < 10 {
return Err(Error::deserial(format!(
"k must be at least 10 in compat float format, got {k}"
@@ -580,16 +562,14 @@ impl TDigestMut {
}
// reference implementation stores capacities of the array of
centroids and the
// buffer as shorts they can be derived from k in the
constructor
- cursor.read_u32::<BE>().map_err(make_error("<unused>"))?;
- let num_centroids = cursor
- .read_u16::<BE>()
- .map_err(make_error("num_centroids"))?
- as usize;
+ cursor.read_u32_be().map_err(make_error("<unused>"))?;
+ let num_centroids =
+ cursor.read_u16_be().map_err(make_error("num_centroids"))?
as usize;
let mut total_weight = 0u64;
let mut centroids = Vec::with_capacity(num_centroids);
for _ in 0..num_centroids {
- let weight =
cursor.read_f32::<BE>().map_err(make_error("weight"))? as u64;
- let mean =
cursor.read_f32::<BE>().map_err(make_error("mean"))? as f64;
+ let weight =
cursor.read_f32_be().map_err(make_error("weight"))? as u64;
+ let mean =
cursor.read_f32_be().map_err(make_error("mean"))? as f64;
let weight = check_nonzero(weight, "centroid weight in
compat float format")?;
check_non_nan(mean, "centroid mean in compat float
format")?;
check_finite(mean, "centroid mean in compat float
format")?;
diff --git a/datasketches/tests/hll_serialization_test.rs
b/datasketches/tests/hll_serialization_test.rs
index fc1969c..9c8200f 100644
--- a/datasketches/tests/hll_serialization_test.rs
+++ b/datasketches/tests/hll_serialization_test.rs
@@ -77,12 +77,18 @@ fn test_sketch_file(path: PathBuf, expected_cardinality:
usize, expected_lg_k: u
// Serialize and deserialize again to test round-trip
let serialized_bytes = sketch1.serialize();
- let sketch2 = HllSketch::deserialize(&serialized_bytes).unwrap();
+ let sketch2 =
HllSketch::deserialize(&serialized_bytes).unwrap_or_else(|err| {
+ panic!(
+ "Deserialization failed after round-trip for {}: {}",
+ path.display(),
+ err
+ )
+ });
// Check that both sketches are functionally equivalent
assert_eq!(
- sketch2.lg_config_k(),
sketch1.lg_config_k(),
+ sketch2.lg_config_k(),
"lg_config_k mismatch after round-trip for {}",
path.display()
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]