This is an automated email from the ASF dual-hosted git repository.

leerho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datasketches-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 38fad36  feat: add frequencies sketches (#44)
38fad36 is described below

commit 38fad365f74e49e74085580997c5a6565df4082b
Author: Chojan Shang <[email protected]>
AuthorDate: Wed Dec 31 01:52:35 2025 +0800

    feat: add frequencies sketches (#44)
    
    * feat: add frequencies sketch
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor: align frequencies tests with HLL/TDigest like style
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * test: try to merge more test cases
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor: merge longsketch into itemsketch
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor: align frequencies serde API with project style
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor: align api names
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    * refactor(frequencies): align errors and lower_bound semantics
    
    Signed-off-by: Chojan Shang <[email protected]>
    
    ---------
    
    Signed-off-by: Chojan Shang <[email protected]>
---
 datasketches/src/{lib.rs => frequencies/mod.rs}    |  33 +-
 .../src/frequencies/reverse_purge_item_hash_map.rs | 319 +++++++++++++
 datasketches/src/frequencies/serde.rs              | 108 +++++
 datasketches/src/frequencies/serialization.rs      |  89 ++++
 datasketches/src/frequencies/sketch.rs             | 494 ++++++++++++++++++++
 datasketches/src/lib.rs                            |   1 +
 .../tests/frequencies_serialization_test.rs        | 212 +++++++++
 datasketches/tests/frequencies_update_test.rs      | 506 +++++++++++++++++++++
 8 files changed, 1743 insertions(+), 19 deletions(-)

diff --git a/datasketches/src/lib.rs b/datasketches/src/frequencies/mod.rs
similarity index 52%
copy from datasketches/src/lib.rs
copy to datasketches/src/frequencies/mod.rs
index f436565..6bd7987 100644
--- a/datasketches/src/lib.rs
+++ b/datasketches/src/frequencies/mod.rs
@@ -15,26 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! # Apache® DataSketches™ Core Rust Library Component
+//! Frequency sketches for finding heavy hitters in data streams.
 //!
-//! The Sketching Core Library provides a range of stochastic streaming 
algorithms and closely
-//! related Rust technologies that are particularly useful when integrating 
this technology into
-//! systems that must deal with massive data.
+//! This module implements the Frequent Items sketch from Apache DataSketches. 
It tracks
+//! approximate frequencies in a stream and can report heavy hitters with 
explicit
+//! error guarantees (no false negatives or no false positives).
 //!
-//! This library is divided into modules that constitute distinct groups of 
functionality.
+//! For background, see the Java documentation:
+//! 
<https://apache.github.io/datasketches-java/9.0.0/org/apache/datasketches/frequencies/FrequentItemsSketch.html>
 
-#![cfg_attr(docsrs, feature(doc_cfg))]
-#![deny(missing_docs)]
+mod reverse_purge_item_hash_map;
+mod serde;
+mod serialization;
+mod sketch;
 
-// See https://github.com/apache/datasketches-rust/issues/28 for more 
information.
-#[cfg(target_endian = "big")]
-compile_error!("datasketches does not support big-endian targets");
-
-pub mod countmin;
-pub mod error;
-pub mod hll;
-pub mod tdigest;
-pub mod theta;
-
-mod codec;
-mod hash;
+pub use self::serde::ItemsSerde;
+pub use self::sketch::ErrorType;
+pub use self::sketch::FrequentItemsSketch;
+pub use self::sketch::Row;
diff --git a/datasketches/src/frequencies/reverse_purge_item_hash_map.rs 
b/datasketches/src/frequencies/reverse_purge_item_hash_map.rs
new file mode 100644
index 0000000..9c17e58
--- /dev/null
+++ b/datasketches/src/frequencies/reverse_purge_item_hash_map.rs
@@ -0,0 +1,319 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Reverse purge hash map for generic items.
+//!
+//! This linear-probing hash map supports a reverse purge operation that 
removes
+//! keys with non-positive counts by scanning clusters from the back to the 
front.
+
+use std::hash::Hash;
+use std::hash::Hasher;
+
+use crate::hash::MurmurHash3X64128;
+
+const LOAD_FACTOR: f64 = 0.75;
+const DRIFT_LIMIT: usize = 1024;
+const MAX_SAMPLE_SIZE: usize = 1024;
+
+/// Linear-probing hash map for (item, count) pairs with reverse purge support.
+#[derive(Debug, Clone)]
+pub(super) struct ReversePurgeItemHashMap<T> {
+    lg_length: u8,
+    load_threshold: usize,
+    keys: Vec<Option<T>>,
+    values: Vec<i64>,
+    states: Vec<u16>,
+    num_active: usize,
+}
+
+impl<T: Eq + Hash> ReversePurgeItemHashMap<T> {
+    /// Creates a new map with arrays of length `map_size` (must be a power of 
two).
+    ///
+    /// The load threshold is set to `LOAD_FACTOR * map_size`.
+    pub fn new(map_size: usize) -> Self {
+        assert!(map_size.is_power_of_two(), "map_size must be power of 2");
+        let lg_length = map_size.trailing_zeros() as u8;
+        let load_threshold = (map_size as f64 * LOAD_FACTOR) as usize;
+        Self {
+            lg_length,
+            load_threshold,
+            keys: (0..map_size).map(|_| None).collect(),
+            values: vec![0; map_size],
+            states: vec![0; map_size],
+            num_active: 0,
+        }
+    }
+
+    /// Returns the value for `key`, or zero if the key is not present.
+    pub fn get(&self, key: &T) -> i64 {
+        let probe = self.hash_probe(key);
+        if self.states[probe] > 0 {
+            return self.values[probe];
+        }
+        0
+    }
+
+    /// Adds `adjust_amount` to the value for `key`, inserting if absent.
+    pub fn adjust_or_put_value(&mut self, key: T, adjust_amount: i64) {
+        let mask = self.keys.len() - 1;
+        let mut probe = (hash_item(&key) as usize) & mask;
+        let mut drift: usize = 1;
+        while self.states[probe] != 0 {
+            let matches = self.keys[probe]
+                .as_ref()
+                .map(|existing| existing == &key)
+                .unwrap_or(false);
+            if matches {
+                break;
+            }
+            probe = (probe + 1) & mask;
+            drift += 1;
+            debug_assert!(drift < DRIFT_LIMIT, "drift limit exceeded");
+        }
+        if self.states[probe] == 0 {
+            self.keys[probe] = Some(key);
+            self.values[probe] = adjust_amount;
+            self.states[probe] = drift as u16;
+            self.num_active += 1;
+        } else {
+            self.values[probe] += adjust_amount;
+        }
+    }
+
+    /// Removes all keys with non-positive counts.
+    pub fn keep_only_positive_counts(&mut self) {
+        let len = self.keys.len();
+        let mut first_probe = len - 1;
+        while self.states[first_probe] > 0 {
+            first_probe -= 1;
+        }
+        for probe in (0..first_probe).rev() {
+            if self.states[probe] > 0 && self.values[probe] <= 0 {
+                self.hash_delete(probe);
+                self.num_active -= 1;
+            }
+        }
+        for probe in (first_probe..len).rev() {
+            if self.states[probe] > 0 && self.values[probe] <= 0 {
+                self.hash_delete(probe);
+                self.num_active -= 1;
+            }
+        }
+    }
+
+    /// Shifts all values by `adjust_amount`.
+    ///
+    /// This is used during purges to decrement counters.
+    pub fn adjust_all_values_by(&mut self, adjust_amount: i64) {
+        for value in &mut self.values {
+            *value += adjust_amount;
+        }
+    }
+
+    /// Purges the map by estimating the median count and removing 
non-positive entries.
+    ///
+    /// Returns the estimated median value that was subtracted from all counts.
+    pub fn purge(&mut self, sample_size: usize) -> i64 {
+        let limit = sample_size.min(self.num_active).min(MAX_SAMPLE_SIZE);
+        let mut samples = Vec::with_capacity(limit);
+        let mut i = 0usize;
+        while samples.len() < limit {
+            if self.is_active(i) {
+                samples.push(self.values[i]);
+            }
+            i += 1;
+        }
+        let mid = samples.len() / 2;
+        samples.select_nth_unstable(mid);
+        let median = samples[mid];
+        self.adjust_all_values_by(-median);
+        self.keep_only_positive_counts();
+        median
+    }
+
+    /// Resizes the hash table to `new_size` (must be a power of two).
+    pub fn resize(&mut self, new_size: usize) {
+        assert!(new_size.is_power_of_two(), "new_size must be power of 2");
+        let mut old_keys = std::mem::take(&mut self.keys);
+        let old_values = std::mem::take(&mut self.values);
+        let old_states = std::mem::take(&mut self.states);
+        self.keys = (0..new_size).map(|_| None).collect();
+        self.values = vec![0; new_size];
+        self.states = vec![0; new_size];
+        self.lg_length = new_size.trailing_zeros() as u8;
+        self.load_threshold = (new_size as f64 * LOAD_FACTOR) as usize;
+        self.num_active = 0;
+        for i in 0..old_keys.len() {
+            if old_states[i] > 0 {
+                if let Some(key) = old_keys[i].take() {
+                    self.adjust_or_put_value(key, old_values[i]);
+                }
+            }
+        }
+    }
+
+    /// Returns the length of the underlying arrays.
+    pub fn len(&self) -> usize {
+        self.keys.len()
+    }
+
+    /// Returns the log2 of the underlying array length.
+    pub fn lg_length(&self) -> u8 {
+        self.lg_length
+    }
+
+    /// Returns the maximum number of keys before a purge or resize.
+    pub fn capacity(&self) -> usize {
+        self.load_threshold
+    }
+
+    /// Returns the number of active keys in the map.
+    pub fn num_active(&self) -> usize {
+        self.num_active
+    }
+
+    /// Returns the active keys in the map.
+    pub fn active_keys(&self) -> Vec<T>
+    where
+        T: Clone,
+    {
+        if self.num_active == 0 {
+            return Vec::new();
+        }
+        let mut keys = Vec::with_capacity(self.num_active);
+        for i in 0..self.keys.len() {
+            if self.states[i] > 0 {
+                if let Some(key) = self.keys[i].as_ref() {
+                    keys.push(key.clone());
+                }
+            }
+        }
+        keys
+    }
+
+    /// Returns the active values in the map.
+    pub fn active_values(&self) -> Vec<i64> {
+        if self.num_active == 0 {
+            return Vec::new();
+        }
+        let mut values = Vec::with_capacity(self.num_active);
+        for i in 0..self.values.len() {
+            if self.states[i] > 0 {
+                values.push(self.values[i]);
+            }
+        }
+        values
+    }
+
+    /// Returns an iterator over active keys and values.
+    pub fn iter(&self) -> ReversePurgeItemIter<'_, T> {
+        ReversePurgeItemIter::new(self)
+    }
+
+    fn is_active(&self, probe: usize) -> bool {
+        self.states[probe] > 0
+    }
+
+    fn hash_probe(&self, key: &T) -> usize {
+        let mask = self.keys.len() - 1;
+        let mut probe = (hash_item(key) as usize) & mask;
+        while self.states[probe] > 0 {
+            let matches = self.keys[probe]
+                .as_ref()
+                .map(|existing| existing == key)
+                .unwrap_or(false);
+            if matches {
+                break;
+            }
+            probe = (probe + 1) & mask;
+        }
+        probe
+    }
+
+    fn hash_delete(&mut self, mut delete_probe: usize) {
+        self.states[delete_probe] = 0;
+        self.keys[delete_probe] = None;
+        let mut drift: usize = 1;
+        let mask = self.keys.len() - 1;
+        let mut probe = (delete_probe + drift) & mask;
+        while self.states[probe] != 0 {
+            if self.states[probe] as usize > drift {
+                self.keys[delete_probe] = self.keys[probe].take();
+                self.values[delete_probe] = self.values[probe];
+                self.states[delete_probe] = self.states[probe] - drift as u16;
+                self.states[probe] = 0;
+                drift = 0;
+                delete_probe = probe;
+            }
+            probe = (probe + 1) & mask;
+            drift += 1;
+            debug_assert!(drift < DRIFT_LIMIT, "drift limit exceeded");
+        }
+    }
+}
+
+/// Iterator over active entries using a golden-ratio stride.
+pub struct ReversePurgeItemIter<'a, T> {
+    map: &'a ReversePurgeItemHashMap<T>,
+    index: usize,
+    count: usize,
+    stride: usize,
+    mask: usize,
+}
+
+impl<'a, T> ReversePurgeItemIter<'a, T> {
+    fn new(map: &'a ReversePurgeItemHashMap<T>) -> Self {
+        let size = map.keys.len();
+        let stride = ((size as f64 * 0.6180339887498949) as usize) | 1;
+        let mask = size - 1;
+        let index = 0usize.wrapping_sub(stride);
+        Self {
+            map,
+            index,
+            count: 0,
+            stride,
+            mask,
+        }
+    }
+}
+
+impl<'a, T> Iterator for ReversePurgeItemIter<'a, T> {
+    type Item = (&'a T, i64);
+
+    fn next(&mut self) -> Option<Self::Item> {
+        if self.count >= self.map.num_active {
+            return None;
+        }
+        loop {
+            self.index = self.index.wrapping_add(self.stride) & self.mask;
+            if self.map.states[self.index] > 0 {
+                self.count += 1;
+                let key = self.map.keys[self.index]
+                    .as_ref()
+                    .expect("active key missing");
+                return Some((key, self.map.values[self.index]));
+            }
+        }
+    }
+}
+
+#[inline]
+fn hash_item<T: Hash>(item: &T) -> u64 {
+    let mut hasher = MurmurHash3X64128::default();
+    item.hash(&mut hasher);
+    hasher.finish()
+}
diff --git a/datasketches/src/frequencies/serde.rs 
b/datasketches/src/frequencies/serde.rs
new file mode 100644
index 0000000..376045d
--- /dev/null
+++ b/datasketches/src/frequencies/serde.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Serialization helpers for frequent items sketches.
+
+use std::str;
+
+use crate::error::Error;
+use crate::frequencies::serialization::read_i64_le;
+use crate::frequencies::serialization::read_u32_le;
+
+/// Built-in serializers for frequency sketch items.
+#[non_exhaustive]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ItemsSerde {
+    /// i64 items compatible with ArrayOfLongsSerDe in Java.
+    Int64,
+    /// UTF-8 strings compatible with ArrayOfStringsSerDe in Java.
+    String,
+}
+
+pub(crate) fn serialize_string_items(items: &[String]) -> Vec<u8> {
+    let total_len: usize = items.iter().map(|item| 4 + item.len()).sum();
+    let mut out = Vec::with_capacity(total_len);
+    for item in items {
+        let bytes = item.as_bytes();
+        let len = bytes.len() as u32;
+        out.extend_from_slice(&len.to_le_bytes());
+        out.extend_from_slice(bytes);
+    }
+    out
+}
+
+pub(crate) fn deserialize_string_items(
+    bytes: &[u8],
+    num_items: usize,
+) -> Result<(Vec<String>, usize), Error> {
+    if num_items == 0 {
+        return Ok((Vec::new(), 0));
+    }
+    let mut items = Vec::with_capacity(num_items);
+    let mut offset = 0usize;
+    for _ in 0..num_items {
+        if offset + 4 > bytes.len() {
+            return Err(Error::insufficient_data(
+                "not enough bytes for string length",
+            ));
+        }
+        let len = read_u32_le(bytes, offset) as usize;
+        offset += 4;
+        if offset + len > bytes.len() {
+            return Err(Error::insufficient_data(
+                "not enough bytes for string payload",
+            ));
+        }
+        let slice = &bytes[offset..offset + len];
+        let value = match str::from_utf8(slice) {
+            Ok(s) => s.to_string(),
+            Err(_) => {
+                return Err(Error::deserial("invalid UTF-8 string payload"));
+            }
+        };
+        items.push(value);
+        offset += len;
+    }
+    Ok((items, offset))
+}
+
+pub(crate) fn serialize_i64_items(items: &[i64]) -> Vec<u8> {
+    let mut out = Vec::with_capacity(items.len() * 8);
+    for item in items {
+        out.extend_from_slice(&item.to_le_bytes());
+    }
+    out
+}
+
+pub(crate) fn deserialize_i64_items(
+    bytes: &[u8],
+    num_items: usize,
+) -> Result<(Vec<i64>, usize), Error> {
+    let needed = num_items
+        .checked_mul(8)
+        .ok_or_else(|| Error::deserial("items size overflow"))?;
+    if bytes.len() < needed {
+        return Err(Error::insufficient_data("not enough bytes for i64 items"));
+    }
+    let mut items = Vec::with_capacity(num_items);
+    for i in 0..num_items {
+        let offset = i * 8;
+        let value = read_i64_le(bytes, offset);
+        items.push(value);
+    }
+    Ok((items, needed))
+}
diff --git a/datasketches/src/frequencies/serialization.rs 
b/datasketches/src/frequencies/serialization.rs
new file mode 100644
index 0000000..24e8952
--- /dev/null
+++ b/datasketches/src/frequencies/serialization.rs
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Serialization constants and helpers for frequency sketches.
+
+/// Family ID for frequency sketches.
+pub const FAMILY_ID: u8 = 10;
+/// Serialization version.
+pub const SER_VER: u8 = 1;
+
+/// Preamble longs for empty sketch.
+pub const PREAMBLE_LONGS_EMPTY: u8 = 1;
+/// Preamble longs for non-empty sketch.
+pub const PREAMBLE_LONGS_NONEMPTY: u8 = 4;
+
+/// Empty flag mask (both bits for compatibility).
+pub const EMPTY_FLAG_MASK: u8 = 5;
+
+/// Offset of preamble longs byte.
+pub const PREAMBLE_LONGS_BYTE: usize = 0;
+/// Offset of serialization version byte.
+pub const SER_VER_BYTE: usize = 1;
+/// Offset of family ID byte.
+pub const FAMILY_BYTE: usize = 2;
+/// Offset of lg_max_map_size byte.
+pub const LG_MAX_MAP_SIZE_BYTE: usize = 3;
+/// Offset of lg_cur_map_size byte.
+pub const LG_CUR_MAP_SIZE_BYTE: usize = 4;
+/// Offset of flags byte.
+pub const FLAGS_BYTE: usize = 5;
+
+/// Offset of active items int (low 32 bits of second pre-long).
+pub const ACTIVE_ITEMS_INT: usize = 8;
+/// Offset of stream weight (third pre-long).
+pub const STREAM_WEIGHT_LONG: usize = 16;
+/// Offset of offset (fourth pre-long).
+pub const OFFSET_LONG: usize = 24;
+
+/// Read a u32 value from bytes at the given offset (little-endian).
+#[inline]
+pub fn read_u32_le(bytes: &[u8], offset: usize) -> u32 {
+    u32::from_le_bytes([
+        bytes[offset],
+        bytes[offset + 1],
+        bytes[offset + 2],
+        bytes[offset + 3],
+    ])
+}
+
+/// Read an i64 value from bytes at the given offset (little-endian).
+#[inline]
+pub fn read_i64_le(bytes: &[u8], offset: usize) -> i64 {
+    i64::from_le_bytes([
+        bytes[offset],
+        bytes[offset + 1],
+        bytes[offset + 2],
+        bytes[offset + 3],
+        bytes[offset + 4],
+        bytes[offset + 5],
+        bytes[offset + 6],
+        bytes[offset + 7],
+    ])
+}
+
+/// Write a u32 value to bytes at the given offset (little-endian).
+#[inline]
+pub fn write_u32_le(bytes: &mut [u8], offset: usize, value: u32) {
+    bytes[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
+}
+
+/// Write an i64 value to bytes at the given offset (little-endian).
+#[inline]
+pub fn write_i64_le(bytes: &mut [u8], offset: usize, value: i64) {
+    bytes[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
+}
diff --git a/datasketches/src/frequencies/sketch.rs 
b/datasketches/src/frequencies/sketch.rs
new file mode 100644
index 0000000..ac5b884
--- /dev/null
+++ b/datasketches/src/frequencies/sketch.rs
@@ -0,0 +1,494 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Frequent items sketch implementations.
+
+use std::hash::Hash;
+
+use crate::error::Error;
+use crate::error::ErrorKind;
+use crate::frequencies::reverse_purge_item_hash_map::ReversePurgeItemHashMap;
+use crate::frequencies::serde::ItemsSerde;
+use crate::frequencies::serde::deserialize_i64_items;
+use crate::frequencies::serde::deserialize_string_items;
+use crate::frequencies::serde::serialize_i64_items;
+use crate::frequencies::serde::serialize_string_items;
+use crate::frequencies::serialization::*;
+
+type DeserializeItems<T> = fn(&[u8], usize) -> Result<(Vec<T>, usize), Error>;
+
+const LG_MIN_MAP_SIZE: u8 = 3;
+const SAMPLE_SIZE: usize = 1024;
+const EPSILON_FACTOR: f64 = 3.5;
+const LOAD_FACTOR_NUMERATOR: usize = 3;
+const LOAD_FACTOR_DENOMINATOR: usize = 4;
+
+/// Error guarantees for frequent item queries.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ErrorType {
+    /// Include items if upper bound exceeds threshold (no false negatives).
+    NoFalseNegatives,
+    /// Include items if lower bound exceeds threshold (no false positives).
+    NoFalsePositives,
+}
+
+/// Result row for frequent item queries.
+///
+/// Each row includes an estimate and upper and lower bounds on the true 
frequency.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct Row<T> {
+    item: T,
+    estimate: i64,
+    upper_bound: i64,
+    lower_bound: u64,
+}
+
+impl<T> Row<T> {
+    /// Returns the item value.
+    pub fn item(&self) -> &T {
+        &self.item
+    }
+
+    /// Returns the estimated frequency.
+    pub fn estimate(&self) -> i64 {
+        self.estimate
+    }
+
+    /// Returns the upper bound for the frequency.
+    pub fn upper_bound(&self) -> i64 {
+        self.upper_bound
+    }
+
+    /// Returns the guaranteed lower bound for the frequency.
+    ///
+    /// This value is never negative.
+    pub fn lower_bound(&self) -> u64 {
+        self.lower_bound
+    }
+}
+
+/// Frequent items sketch for generic item types.
+///
+/// The sketch tracks approximate item frequencies and can return estimates 
with
+/// guaranteed upper and lower bounds.
+///
+/// See [`crate::frequencies`] for an overview and error guarantees.
+#[derive(Debug, Clone)]
+pub struct FrequentItemsSketch<T> {
+    lg_max_map_size: u8,
+    cur_map_cap: usize,
+    offset: i64,
+    stream_weight: i64,
+    sample_size: usize,
+    hash_map: ReversePurgeItemHashMap<T>,
+}
+
+impl<T: Eq + Hash> FrequentItemsSketch<T> {
+    /// Creates a new sketch with the given maximum map size (power of two).
+    ///
+    /// The maximum map capacity is `0.75 * max_map_size`, and the internal 
map grows
+    /// from a small starting size up to the maximum as needed.
+    ///
+    /// # Panics
+    ///
+    /// Panics if `max_map_size` is not a power of two.
+    pub fn new(max_map_size: usize) -> Self {
+        let lg_max_map_size = exact_log2(max_map_size);
+        Self::with_lg_map_sizes(lg_max_map_size, LG_MIN_MAP_SIZE)
+    }
+
+    /// Returns true if the sketch is empty.
+    pub fn is_empty(&self) -> bool {
+        self.hash_map.num_active() == 0
+    }
+
+    /// Returns the number of active items being tracked.
+    pub fn num_active_items(&self) -> usize {
+        self.hash_map.num_active()
+    }
+
+    /// Returns the total weight of the stream.
+    ///
+    /// This is the sum of all counts passed to `update` and 
`update_with_count`.
+    pub fn total_weight(&self) -> i64 {
+        self.stream_weight
+    }
+
+    /// Returns the estimated frequency for an item.
+    ///
+    /// If the item is tracked, this is `item_count + offset`. Otherwise it is 
zero.
+    pub fn estimate(&self, item: &T) -> i64 {
+        let value = self.hash_map.get(item);
+        if value > 0 { value + self.offset } else { 0 }
+    }
+
+    /// Returns the guaranteed lower bound frequency for an item.
+    ///
+    /// This value is never negative and is guaranteed to be no larger than 
the true frequency.
+    /// If the item is not tracked, the lower bound is zero.
+    pub fn lower_bound(&self, item: &T) -> u64 {
+        let value = self.hash_map.get(item);
+        value.max(0) as u64
+    }
+
+    /// Returns the guaranteed upper bound frequency for an item.
+    ///
+    /// This value is guaranteed to be no smaller than the true frequency.
+    /// If the item is tracked, this is `item_count + offset`.
+    pub fn upper_bound(&self, item: &T) -> i64 {
+        self.hash_map.get(item) + self.offset
+    }
+
+    /// Returns an upper bound on the maximum error of 
[`FrequentItemsSketch::estimate`]
+    /// for any item.
+    ///
+    /// This is equivalent to the maximum distance between the upper bound and 
the lower bound
+    /// for any item.
+    pub fn maximum_error(&self) -> i64 {
+        self.offset
+    }
+
+    /// Returns epsilon for this sketch.
+    pub fn epsilon(&self) -> f64 {
+        Self::epsilon_for_lg(self.lg_max_map_size)
+    }
+
+    /// Returns epsilon for a sketch configured with `lg_max_map_size`.
+    pub fn epsilon_for_lg(lg_max_map_size: u8) -> f64 {
+        EPSILON_FACTOR / (1u64 << lg_max_map_size) as f64
+    }
+
+    /// Returns the a priori error estimate.
+    pub fn apriori_error(lg_max_map_size: u8, estimated_total_weight: i64) -> 
f64 {
+        Self::epsilon_for_lg(lg_max_map_size) * estimated_total_weight as f64
+    }
+
+    /// Returns the maximum map capacity for this sketch.
+    ///
+    /// This is `0.75 * max_map_size`.
+    pub fn maximum_map_capacity(&self) -> usize {
+        (1usize << self.lg_max_map_size) * LOAD_FACTOR_NUMERATOR / 
LOAD_FACTOR_DENOMINATOR
+    }
+
+    /// Returns the current map capacity.
+    ///
+    /// This is the number of counters supported before resizing or purging.
+    pub fn current_map_capacity(&self) -> usize {
+        self.cur_map_cap
+    }
+
+    /// Returns the configured log2 maximum map size.
+    pub fn lg_max_map_size(&self) -> u8 {
+        self.lg_max_map_size
+    }
+
+    /// Returns the current map size in log2.
+    pub fn lg_cur_map_size(&self) -> u8 {
+        self.hash_map.lg_length()
+    }
+
+    /// Updates the sketch with a count of one.
+    pub fn update(&mut self, item: T) {
+        self.update_with_count(item, 1);
+    }
+
+    /// Updates the sketch with an item and count.
+    ///
+    /// # Panics
+    ///
+    /// Panics if `count` is negative.
+    ///
+    /// A count of zero is a no-op.
+    pub fn update_with_count(&mut self, item: T, count: i64) {
+        if count == 0 {
+            return;
+        }
+        assert!(count > 0, "count may not be negative");
+        self.stream_weight += count;
+        self.hash_map.adjust_or_put_value(item, count);
+        self.maybe_resize_or_purge();
+    }
+
+    /// Merges another sketch into this one.
+    ///
+    /// The other sketch may have a different map size. The merged sketch 
respects the
+    /// larger error tolerance of the inputs.
+    pub fn merge(&mut self, other: &Self)
+    where
+        T: Clone,
+    {
+        if other.is_empty() {
+            return;
+        }
+        let merged_total = self.stream_weight + other.stream_weight;
+        for (item, count) in other.hash_map.iter() {
+            self.update_with_count(item.clone(), count);
+        }
+        self.offset += other.offset;
+        self.stream_weight = merged_total;
+    }
+
+    /// Resets the sketch to an empty state.
+    pub fn reset(&mut self) {
+        *self = Self::with_lg_map_sizes(self.lg_max_map_size, LG_MIN_MAP_SIZE);
+    }
+
+    /// Returns frequent items using the sketch maximum error as threshold.
+    ///
+    /// This is equivalent to 
`frequent_items_with_threshold(self.maximum_error(), error_type)`.
+    pub fn frequent_items(&self, error_type: ErrorType) -> Vec<Row<T>>
+    where
+        T: Clone,
+    {
+        self.frequent_items_with_threshold(error_type, self.offset)
+    }
+
+    /// Returns frequent items using a custom threshold.
+    ///
+    /// If `threshold` is less than `maximum_error`, `maximum_error` is used 
instead.
+    ///
+    /// For [`ErrorType::NoFalseNegatives`], items are included when 
`upper_bound > threshold`.
+    /// For [`ErrorType::NoFalsePositives`], items are included when 
`lower_bound > threshold`.
+    pub fn frequent_items_with_threshold(
+        &self,
+        error_type: ErrorType,
+        threshold: i64,
+    ) -> Vec<Row<T>>
+    where
+        T: Clone,
+    {
+        let threshold = threshold.max(self.offset);
+        let mut rows = Vec::new();
+        for (item, count) in self.hash_map.iter() {
+            let lower = count;
+            let upper = count + self.offset;
+            let include = match error_type {
+                ErrorType::NoFalseNegatives => upper > threshold,
+                ErrorType::NoFalsePositives => lower > threshold,
+            };
+            if include {
+                rows.push(Row {
+                    item: item.clone(),
+                    estimate: upper,
+                    upper_bound: upper,
+                    lower_bound: lower.max(0) as u64,
+                });
+            }
+        }
+        rows.sort_by(|a, b| b.estimate.cmp(&a.estimate));
+        rows
+    }
+
+    fn maybe_resize_or_purge(&mut self) {
+        if self.hash_map.num_active() > self.cur_map_cap {
+            if self.hash_map.lg_length() < self.lg_max_map_size {
+                self.hash_map.resize(self.hash_map.len() * 2);
+                self.cur_map_cap = self.hash_map.capacity();
+            } else {
+                let delta = self.hash_map.purge(self.sample_size);
+                self.offset += delta;
+                if self.hash_map.num_active() > self.maximum_map_capacity() {
+                    panic!("purge did not reduce number of active items");
+                }
+            }
+        }
+    }
+
+    fn with_lg_map_sizes(lg_max_map_size: u8, lg_cur_map_size: u8) -> Self {
+        let lg_max = lg_max_map_size.max(LG_MIN_MAP_SIZE);
+        let lg_cur = lg_cur_map_size.max(LG_MIN_MAP_SIZE);
+        assert!(
+            lg_cur <= lg_max,
+            "lg_cur_map_size must not exceed lg_max_map_size"
+        );
+        let map = ReversePurgeItemHashMap::new(1usize << lg_cur);
+        let cur_map_cap = map.capacity();
+        let max_map_cap = (1usize << lg_max) * LOAD_FACTOR_NUMERATOR / 
LOAD_FACTOR_DENOMINATOR;
+        let sample_size = SAMPLE_SIZE.min(max_map_cap);
+        Self {
+            lg_max_map_size: lg_max,
+            cur_map_cap,
+            offset: 0,
+            stream_weight: 0,
+            sample_size,
+            hash_map: map,
+        }
+    }
+
+    fn serialize_inner(&self, serialize_items: fn(&[T]) -> Vec<u8>) -> Vec<u8>
+    where
+        T: Clone,
+    {
+        if self.is_empty() {
+            let mut out = vec![0u8; 8];
+            out[PREAMBLE_LONGS_BYTE] = PREAMBLE_LONGS_EMPTY;
+            out[SER_VER_BYTE] = SER_VER;
+            out[FAMILY_BYTE] = FAMILY_ID;
+            out[LG_MAX_MAP_SIZE_BYTE] = self.lg_max_map_size;
+            out[LG_CUR_MAP_SIZE_BYTE] = self.hash_map.lg_length();
+            out[FLAGS_BYTE] = EMPTY_FLAG_MASK;
+            return out;
+        }
+        let active_items = self.num_active_items();
+        let values = self.hash_map.active_values();
+        let keys = self.hash_map.active_keys();
+        let items_bytes = serialize_items(&keys);
+        let total_bytes =
+            PREAMBLE_LONGS_NONEMPTY as usize * 8 + (active_items * 8) + 
items_bytes.len();
+        let mut out = vec![0u8; total_bytes];
+        out[PREAMBLE_LONGS_BYTE] = PREAMBLE_LONGS_NONEMPTY;
+        out[SER_VER_BYTE] = SER_VER;
+        out[FAMILY_BYTE] = FAMILY_ID;
+        out[LG_MAX_MAP_SIZE_BYTE] = self.lg_max_map_size;
+        out[LG_CUR_MAP_SIZE_BYTE] = self.hash_map.lg_length();
+        out[FLAGS_BYTE] = 0;
+        write_u32_le(&mut out, ACTIVE_ITEMS_INT, active_items as u32);
+        write_i64_le(&mut out, STREAM_WEIGHT_LONG, self.stream_weight);
+        write_i64_le(&mut out, OFFSET_LONG, self.offset);
+
+        let mut offset = PREAMBLE_LONGS_NONEMPTY as usize * 8;
+        for value in values {
+            write_i64_le(&mut out, offset, value);
+            offset += 8;
+        }
+        out[offset..offset + items_bytes.len()].copy_from_slice(&items_bytes);
+        out
+    }
+
+    fn deserialize_inner(
+        bytes: &[u8],
+        deserialize_items: DeserializeItems<T>,
+    ) -> Result<Self, Error> {
+        if bytes.len() < 8 {
+            return Err(Error::insufficient_data("preamble"));
+        }
+        let pre_longs = bytes[PREAMBLE_LONGS_BYTE] & 0x3f;
+        let ser_ver = bytes[SER_VER_BYTE];
+        let family = bytes[FAMILY_BYTE];
+        let lg_max = bytes[LG_MAX_MAP_SIZE_BYTE];
+        let lg_cur = bytes[LG_CUR_MAP_SIZE_BYTE];
+        let flags = bytes[FLAGS_BYTE];
+        let is_empty = (flags & EMPTY_FLAG_MASK) != 0;
+        if ser_ver != SER_VER {
+            return Err(Error::unsupported_serial_version(SER_VER, ser_ver));
+        }
+        if family != FAMILY_ID {
+            return Err(Error::invalid_family(
+                FAMILY_ID,
+                family,
+                "FrequentItemsSketch",
+            ));
+        }
+        if lg_cur > lg_max {
+            return Err(Error::deserial("lg_cur_map_size exceeds 
lg_max_map_size"));
+        }
+        if is_empty {
+            if pre_longs != PREAMBLE_LONGS_EMPTY {
+                return Err(Error::invalid_preamble_longs(
+                    PREAMBLE_LONGS_EMPTY,
+                    pre_longs,
+                ));
+            }
+            return Ok(Self::with_lg_map_sizes(lg_max, lg_cur));
+        }
+        if pre_longs != PREAMBLE_LONGS_NONEMPTY {
+            return Err(Error::invalid_preamble_longs(
+                PREAMBLE_LONGS_NONEMPTY,
+                pre_longs,
+            ));
+        }
+        if bytes.len() < PREAMBLE_LONGS_NONEMPTY as usize * 8 {
+            return Err(Error::insufficient_data("full preamble"));
+        }
+        let active_items = read_u32_le(bytes, ACTIVE_ITEMS_INT) as usize;
+        let stream_weight = read_i64_le(bytes, STREAM_WEIGHT_LONG);
+        let offset_val = read_i64_le(bytes, OFFSET_LONG);
+        let values_offset = PREAMBLE_LONGS_NONEMPTY as usize * 8;
+        let values_bytes = active_items
+            .checked_mul(8)
+            .ok_or_else(|| Error::deserial("values size overflow"))?;
+        let items_offset = values_offset + values_bytes;
+        if bytes.len() < items_offset {
+            return Err(Error::insufficient_data("values"));
+        }
+        let mut values = Vec::with_capacity(active_items);
+        for i in 0..active_items {
+            values.push(read_i64_le(bytes, values_offset + i * 8));
+        }
+        let (items, consumed) = deserialize_items(&bytes[items_offset..], 
active_items)?;
+        if items.len() != active_items {
+            return Err(Error::deserial(
+                "item count mismatch during deserialization",
+            ));
+        }
+        if consumed > bytes.len() - items_offset {
+            return Err(Error::insufficient_data("items"));
+        }
+        let mut sketch = Self::with_lg_map_sizes(lg_max, lg_cur);
+        for (item, value) in items.into_iter().zip(values) {
+            sketch.update_with_count(item, value);
+        }
+        sketch.stream_weight = stream_weight;
+        sketch.offset = offset_val;
+        Ok(sketch)
+    }
+}
+
+impl FrequentItemsSketch<i64> {
+    /// Serializes this sketch into a byte vector.
+    pub fn serialize(&self) -> Vec<u8> {
+        self.serialize_inner(serialize_i64_items)
+    }
+
+    /// Deserializes a sketch from bytes using the selected serializer.
+    ///
+    /// Returns an error if `serde` does not match the sketch item type.
+    pub fn deserialize(bytes: &[u8], serde: ItemsSerde) -> Result<Self, Error> 
{
+        match serde {
+            ItemsSerde::Int64 => Self::deserialize_inner(bytes, 
deserialize_i64_items),
+            ItemsSerde::String => Err(Error::new(
+                ErrorKind::InvalidArgument,
+                "ItemsSerde::String cannot deserialize i64 items",
+            )),
+        }
+    }
+}
+
+impl FrequentItemsSketch<String> {
+    /// Serializes this sketch into a byte vector.
+    pub fn serialize(&self) -> Vec<u8> {
+        self.serialize_inner(serialize_string_items)
+    }
+
+    /// Deserializes a sketch from bytes using the selected serializer.
+    ///
+    /// Returns an error if `serde` does not match the sketch item type.
+    pub fn deserialize(bytes: &[u8], serde: ItemsSerde) -> Result<Self, Error> 
{
+        match serde {
+            ItemsSerde::String => Self::deserialize_inner(bytes, 
deserialize_string_items),
+            ItemsSerde::Int64 => Err(Error::new(
+                ErrorKind::InvalidArgument,
+                "ItemsSerde::Int64 cannot deserialize String items",
+            )),
+        }
+    }
+}
+
+fn exact_log2(value: usize) -> u8 {
+    assert!(value.is_power_of_two(), "value must be power of 2");
+    value.trailing_zeros() as u8
+}
diff --git a/datasketches/src/lib.rs b/datasketches/src/lib.rs
index f436565..4d779a0 100644
--- a/datasketches/src/lib.rs
+++ b/datasketches/src/lib.rs
@@ -32,6 +32,7 @@ compile_error!("datasketches does not support big-endian 
targets");
 
 pub mod countmin;
 pub mod error;
+pub mod frequencies;
 pub mod hll;
 pub mod tdigest;
 pub mod theta;
diff --git a/datasketches/tests/frequencies_serialization_test.rs 
b/datasketches/tests/frequencies_serialization_test.rs
new file mode 100644
index 0000000..d43b4d1
--- /dev/null
+++ b/datasketches/tests/frequencies_serialization_test.rs
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod common;
+
+use std::fs;
+
+use common::serialization_test_data;
+use datasketches::error::ErrorKind;
+use datasketches::frequencies::FrequentItemsSketch;
+use datasketches::frequencies::ItemsSerde;
+
+#[test]
+fn test_longs_round_trip() {
+    let mut sketch: FrequentItemsSketch<i64> = FrequentItemsSketch::new(32);
+    for i in 1..=100 {
+        sketch.update_with_count(i, i);
+    }
+    let serde = ItemsSerde::Int64;
+    let bytes = sketch.serialize();
+    let restored = FrequentItemsSketch::<i64>::deserialize(&bytes, 
serde).unwrap();
+    assert_eq!(restored.total_weight(), sketch.total_weight());
+    assert_eq!(restored.estimate(&42), sketch.estimate(&42));
+    assert_eq!(restored.maximum_error(), sketch.maximum_error());
+}
+
+#[test]
+fn test_items_round_trip() {
+    let mut sketch = FrequentItemsSketch::new(32);
+    sketch.update_with_count("alpha".to_string(), 3);
+    sketch.update_with_count("beta".to_string(), 5);
+    sketch.update_with_count("gamma".to_string(), 7);
+
+    let serde = ItemsSerde::String;
+    let bytes = sketch.serialize();
+    let restored = FrequentItemsSketch::<String>::deserialize(&bytes, 
serde).unwrap();
+    assert_eq!(restored.total_weight(), sketch.total_weight());
+    assert_eq!(restored.estimate(&"beta".to_string()), 5);
+    assert_eq!(restored.maximum_error(), sketch.maximum_error());
+}
+
+#[test]
+fn test_java_frequent_longs_compatibility() {
+    let test_cases = [0, 1, 10, 100, 1000, 10000, 100000, 1000000];
+    let serde = ItemsSerde::Int64;
+    for n in test_cases {
+        let filename = format!("frequent_long_n{}_java.sk", n);
+        let path = serialization_test_data("java_generated_files", &filename);
+        let bytes = fs::read(&path).unwrap();
+        let sketch = FrequentItemsSketch::<i64>::deserialize(&bytes, 
serde).unwrap();
+        assert_eq!(sketch.is_empty(), n == 0);
+        if n > 10 {
+            assert!(sketch.maximum_error() > 0);
+        } else {
+            assert_eq!(sketch.maximum_error(), 0);
+        }
+        assert_eq!(sketch.total_weight(), n as i64);
+    }
+}
+
+#[test]
+fn test_java_frequent_strings_ascii() {
+    let path = serialization_test_data("java_generated_files", 
"frequent_string_ascii_java.sk");
+    let bytes = fs::read(&path).unwrap();
+    let serde = ItemsSerde::String;
+    let sketch = FrequentItemsSketch::<String>::deserialize(&bytes, 
serde).unwrap();
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.maximum_error(), 0);
+    assert_eq!(sketch.total_weight(), 10);
+    assert_eq!(
+        sketch.estimate(&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()),
+        1
+    );
+    assert_eq!(
+        sketch.estimate(&"bbbbbbbbbbbbbbbbbbbbbbbbbbbbb".to_string()),
+        2
+    );
+    assert_eq!(
+        sketch.estimate(&"ccccccccccccccccccccccccccccc".to_string()),
+        3
+    );
+    assert_eq!(
+        sketch.estimate(&"ddddddddddddddddddddddddddddd".to_string()),
+        4
+    );
+}
+
+#[test]
+fn test_java_frequent_strings_utf8() {
+    let path = serialization_test_data("java_generated_files", 
"frequent_string_utf8_java.sk");
+    let bytes = fs::read(&path).unwrap();
+    let serde = ItemsSerde::String;
+    let sketch = FrequentItemsSketch::<String>::deserialize(&bytes, 
serde).unwrap();
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.maximum_error(), 0);
+    assert_eq!(sketch.total_weight(), 28);
+    assert_eq!(sketch.estimate(&"абвгд".to_string()), 1);
+    assert_eq!(sketch.estimate(&"еёжзи".to_string()), 2);
+    assert_eq!(sketch.estimate(&"йклмн".to_string()), 3);
+    assert_eq!(sketch.estimate(&"опрст".to_string()), 4);
+    assert_eq!(sketch.estimate(&"уфхцч".to_string()), 5);
+    assert_eq!(sketch.estimate(&"шщъыь".to_string()), 6);
+    assert_eq!(sketch.estimate(&"эюя".to_string()), 7);
+}
+
+#[test]
+fn test_cpp_frequent_longs_compatibility() {
+    let test_cases = [0, 1, 10, 100, 1000, 10000, 100000, 1000000];
+    let serde = ItemsSerde::Int64;
+    for n in test_cases {
+        let filename = format!("frequent_long_n{}_cpp.sk", n);
+        let path = serialization_test_data("cpp_generated_files", &filename);
+        let bytes = fs::read(&path).unwrap();
+        let sketch = FrequentItemsSketch::<i64>::deserialize(&bytes, serde);
+        if cfg!(windows) {
+            if let Err(err) = sketch {
+                assert_eq!(err.kind(), ErrorKind::InvalidData);
+                assert!(
+                    err.message().contains("insufficient data"),
+                    "expected insufficient data error, got: {err}"
+                );
+                continue;
+            }
+        }
+        let sketch = sketch.unwrap();
+        assert_eq!(sketch.is_empty(), n == 0);
+        if n > 10 {
+            assert!(sketch.maximum_error() > 0);
+        } else {
+            assert_eq!(sketch.maximum_error(), 0);
+        }
+        assert_eq!(sketch.total_weight(), n as i64);
+    }
+}
+
+#[test]
+fn test_cpp_frequent_strings_compatibility() {
+    let test_cases = [0, 1, 10, 100, 1000, 10000, 100000, 1000000];
+    for n in test_cases {
+        let filename = format!("frequent_string_n{}_cpp.sk", n);
+        let path = serialization_test_data("cpp_generated_files", &filename);
+        let bytes = fs::read(&path).unwrap();
+        let serde = ItemsSerde::String;
+        let sketch = FrequentItemsSketch::<String>::deserialize(&bytes, 
serde).unwrap();
+        assert_eq!(sketch.is_empty(), n == 0);
+        if n > 10 {
+            assert!(sketch.maximum_error() > 0);
+        } else {
+            assert_eq!(sketch.maximum_error(), 0);
+        }
+        assert_eq!(sketch.total_weight(), n as i64);
+    }
+}
+
+#[test]
+fn test_cpp_frequent_strings_ascii() {
+    let path = serialization_test_data("cpp_generated_files", 
"frequent_string_ascii_cpp.sk");
+    let bytes = fs::read(&path).unwrap();
+    let serde = ItemsSerde::String;
+    let sketch = FrequentItemsSketch::<String>::deserialize(&bytes, 
serde).unwrap();
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.maximum_error(), 0);
+    assert_eq!(sketch.total_weight(), 10);
+    assert_eq!(
+        sketch.estimate(&"aaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string()),
+        1
+    );
+    assert_eq!(
+        sketch.estimate(&"bbbbbbbbbbbbbbbbbbbbbbbbbbbbb".to_string()),
+        2
+    );
+    assert_eq!(
+        sketch.estimate(&"ccccccccccccccccccccccccccccc".to_string()),
+        3
+    );
+    assert_eq!(
+        sketch.estimate(&"ddddddddddddddddddddddddddddd".to_string()),
+        4
+    );
+}
+
+#[test]
+fn test_cpp_frequent_strings_utf8() {
+    let path = serialization_test_data("cpp_generated_files", 
"frequent_string_utf8_cpp.sk");
+    let bytes = fs::read(&path).unwrap();
+    let serde = ItemsSerde::String;
+    let sketch = FrequentItemsSketch::<String>::deserialize(&bytes, 
serde).unwrap();
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.maximum_error(), 0);
+    assert_eq!(sketch.total_weight(), 28);
+    assert_eq!(sketch.estimate(&"абвгд".to_string()), 1);
+    assert_eq!(sketch.estimate(&"еёжзи".to_string()), 2);
+    assert_eq!(sketch.estimate(&"йклмн".to_string()), 3);
+    assert_eq!(sketch.estimate(&"опрст".to_string()), 4);
+    assert_eq!(sketch.estimate(&"уфхцч".to_string()), 5);
+    assert_eq!(sketch.estimate(&"шщъыь".to_string()), 6);
+    assert_eq!(sketch.estimate(&"эюя".to_string()), 7);
+}
diff --git a/datasketches/tests/frequencies_update_test.rs 
b/datasketches/tests/frequencies_update_test.rs
new file mode 100644
index 0000000..8947793
--- /dev/null
+++ b/datasketches/tests/frequencies_update_test.rs
@@ -0,0 +1,506 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datasketches::frequencies::ErrorType;
+use datasketches::frequencies::FrequentItemsSketch;
+
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+struct TestItem(i32);
+
+#[test]
+fn test_longs_update_with_zero_count_is_noop() {
+    let mut sketch: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    sketch.update_with_count(1, 0);
+
+    assert!(sketch.is_empty());
+    assert_eq!(sketch.total_weight(), 0);
+    assert_eq!(sketch.num_active_items(), 0);
+}
+
+#[test]
+fn test_items_update_with_zero_count_is_noop() {
+    let mut sketch = FrequentItemsSketch::new(8);
+    sketch.update_with_count("a".to_string(), 0);
+
+    assert!(sketch.is_empty());
+    assert_eq!(sketch.total_weight(), 0);
+    assert_eq!(sketch.num_active_items(), 0);
+}
+
+#[test]
+fn test_capacity_and_epsilon_helpers() {
+    let longs: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    assert_eq!(longs.current_map_capacity(), 6);
+    assert_eq!(longs.maximum_map_capacity(), 6);
+    assert_eq!(longs.lg_cur_map_size(), 3);
+    assert_eq!(longs.lg_max_map_size(), 3);
+
+    let epsilon = FrequentItemsSketch::<i64>::epsilon_for_lg(10);
+    let expected = 3.5 / 1024.0;
+    assert!((epsilon - expected).abs() < 1e-12);
+
+    let apriori = FrequentItemsSketch::<i64>::apriori_error(10, 10_000);
+    assert!((apriori - expected * 10_000.0).abs() < 1e-9);
+
+    let items: FrequentItemsSketch<i32> = FrequentItemsSketch::new(1024);
+    assert!((items.epsilon() - expected).abs() < 1e-12);
+    assert_eq!(items.current_map_capacity(), 6);
+    assert_eq!(items.maximum_map_capacity(), 768);
+    assert_eq!(items.lg_max_map_size(), 10);
+}
+
+#[test]
+fn test_longs_empty() {
+    let sketch: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+
+    assert!(sketch.is_empty());
+    assert_eq!(sketch.num_active_items(), 0);
+    assert_eq!(sketch.total_weight(), 0);
+    assert_eq!(sketch.estimate(&42), 0);
+    assert_eq!(sketch.lower_bound(&42), 0);
+    assert_eq!(sketch.upper_bound(&42), 0);
+    assert_eq!(sketch.maximum_error(), 0);
+}
+
+#[test]
+fn test_items_empty() {
+    let sketch: FrequentItemsSketch<String> = FrequentItemsSketch::new(8);
+    let item = "a".to_string();
+
+    assert!(sketch.is_empty());
+    assert_eq!(sketch.num_active_items(), 0);
+    assert_eq!(sketch.total_weight(), 0);
+    assert_eq!(sketch.estimate(&item), 0);
+    assert_eq!(sketch.lower_bound(&item), 0);
+    assert_eq!(sketch.upper_bound(&item), 0);
+    assert_eq!(sketch.maximum_error(), 0);
+}
+
+#[test]
+fn test_longs_one_item() {
+    let mut sketch: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    sketch.update(10);
+
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.num_active_items(), 1);
+    assert_eq!(sketch.total_weight(), 1);
+    assert_eq!(sketch.estimate(&10), 1);
+    assert_eq!(sketch.lower_bound(&10), 1);
+    assert_eq!(sketch.upper_bound(&10), 1);
+}
+
+#[test]
+fn test_items_one_item() {
+    let mut sketch = FrequentItemsSketch::new(8);
+    let item = "a".to_string();
+    sketch.update(item.clone());
+
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.num_active_items(), 1);
+    assert_eq!(sketch.total_weight(), 1);
+    assert_eq!(sketch.estimate(&item), 1);
+    assert_eq!(sketch.lower_bound(&item), 1);
+    assert_eq!(sketch.upper_bound(&item), 1);
+}
+
+#[test]
+fn test_longs_several_items_no_resize_no_purge() {
+    let mut sketch: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    sketch.update(1);
+    sketch.update(2);
+    sketch.update(3);
+    sketch.update(4);
+    sketch.update(2);
+    sketch.update(3);
+    sketch.update(2);
+
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.total_weight(), 7);
+    assert_eq!(sketch.num_active_items(), 4);
+    assert_eq!(sketch.estimate(&1), 1);
+    assert_eq!(sketch.estimate(&2), 3);
+    assert_eq!(sketch.estimate(&3), 2);
+    assert_eq!(sketch.estimate(&4), 1);
+    assert_eq!(sketch.maximum_error(), 0);
+}
+
+#[test]
+fn test_items_several_items_no_resize_no_purge() {
+    let mut sketch = FrequentItemsSketch::new(8);
+    let a = "a".to_string();
+    let b = "b".to_string();
+    let c = "c".to_string();
+    let d = "d".to_string();
+    sketch.update(a.clone());
+    sketch.update(b.clone());
+    sketch.update(c.clone());
+    sketch.update(d.clone());
+    sketch.update(b.clone());
+    sketch.update(c.clone());
+    sketch.update(b.clone());
+
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.total_weight(), 7);
+    assert_eq!(sketch.num_active_items(), 4);
+    assert_eq!(sketch.estimate(&a), 1);
+    assert_eq!(sketch.estimate(&b), 3);
+    assert_eq!(sketch.estimate(&c), 2);
+    assert_eq!(sketch.estimate(&d), 1);
+    assert_eq!(sketch.maximum_error(), 0);
+
+    let rows = sketch.frequent_items(ErrorType::NoFalsePositives);
+    assert_eq!(rows.len(), 4);
+
+    let rows = 
sketch.frequent_items_with_threshold(ErrorType::NoFalsePositives, 2);
+    assert_eq!(rows.len(), 1);
+    assert_eq!(rows[0].item(), &b);
+
+    sketch.reset();
+    assert!(sketch.is_empty());
+    assert_eq!(sketch.num_active_items(), 0);
+    assert_eq!(sketch.total_weight(), 0);
+}
+
+#[test]
+fn test_items_several_items_with_resize_no_purge() {
+    let mut sketch = FrequentItemsSketch::new(16);
+    let a = "a".to_string();
+    let b = "b".to_string();
+    let c = "c".to_string();
+    let d = "d".to_string();
+    sketch.update(a.clone());
+    sketch.update(b.clone());
+    sketch.update(c.clone());
+    sketch.update(d.clone());
+    sketch.update(b.clone());
+    sketch.update(c.clone());
+    sketch.update(b.clone());
+    for item in ["e", "f", "g", "h", "i", "j", "k", "l"] {
+        sketch.update(item.to_string());
+    }
+
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.total_weight(), 15);
+    assert_eq!(sketch.num_active_items(), 12);
+    assert_eq!(sketch.estimate(&a), 1);
+    assert_eq!(sketch.estimate(&b), 3);
+    assert_eq!(sketch.estimate(&c), 2);
+    assert_eq!(sketch.estimate(&d), 1);
+    assert_eq!(sketch.maximum_error(), 0);
+}
+
+#[test]
+fn test_longs_estimation_mode() {
+    let mut sketch: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    sketch.update_with_count(1, 10);
+    for item in 2..=6 {
+        sketch.update(item);
+    }
+    sketch.update_with_count(7, 15);
+    for item in 8..=12 {
+        sketch.update(item);
+    }
+
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.total_weight(), 35);
+    assert!(sketch.maximum_error() > 0);
+
+    let items = sketch.frequent_items(ErrorType::NoFalsePositives);
+    assert_eq!(items.len(), 2);
+    assert_eq!(items[0].item(), &7);
+    assert_eq!(items[0].estimate(), 15);
+    assert_eq!(items[1].item(), &1);
+    assert_eq!(items[1].estimate(), 10);
+
+    let items = sketch.frequent_items(ErrorType::NoFalseNegatives);
+    assert!(items.len() >= 2);
+    assert!(items.len() <= 12);
+}
+
+#[test]
+fn test_items_estimation_mode() {
+    let mut sketch: FrequentItemsSketch<i32> = FrequentItemsSketch::new(8);
+    sketch.update_with_count(1, 10);
+    for item in 2..=6 {
+        sketch.update(item);
+    }
+    sketch.update_with_count(7, 15);
+    for item in 8..=12 {
+        sketch.update(item);
+    }
+
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.total_weight(), 35);
+    assert!(sketch.maximum_error() > 0);
+
+    let items = sketch.frequent_items(ErrorType::NoFalsePositives);
+    assert_eq!(items.len(), 2);
+    assert_eq!(items[0].item(), &7);
+    assert_eq!(items[0].estimate(), 15);
+    assert_eq!(items[1].item(), &1);
+    assert_eq!(items[1].estimate(), 10);
+
+    let items = sketch.frequent_items(ErrorType::NoFalseNegatives);
+    assert!(items.len() >= 2);
+    assert!(items.len() <= 12);
+}
+
+#[test]
+fn test_longs_purge_keeps_heavy_hitters() {
+    let mut sketch: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    sketch.update_with_count(1, 10);
+    for item in 2..=7 {
+        sketch.update(item);
+    }
+
+    assert_eq!(sketch.total_weight(), 16);
+    assert_eq!(sketch.maximum_error(), 1);
+    assert_eq!(sketch.estimate(&1), 10);
+    assert_eq!(sketch.lower_bound(&1), 9);
+
+    let rows = sketch.frequent_items(ErrorType::NoFalsePositives);
+    assert_eq!(rows.len(), 1);
+    assert_eq!(rows[0].item(), &1);
+    assert_eq!(rows[0].estimate(), 10);
+}
+
+#[test]
+fn test_items_purge_keeps_heavy_hitters() {
+    let mut sketch = FrequentItemsSketch::new(8);
+    sketch.update_with_count("a".to_string(), 10);
+    for item in ["b", "c", "d", "e", "f", "g"] {
+        sketch.update(item.to_string());
+    }
+
+    assert_eq!(sketch.total_weight(), 16);
+    assert_eq!(sketch.maximum_error(), 1);
+    assert_eq!(sketch.estimate(&"a".to_string()), 10);
+    assert_eq!(sketch.lower_bound(&"a".to_string()), 9);
+
+    let rows = sketch.frequent_items(ErrorType::NoFalsePositives);
+    assert_eq!(rows.len(), 1);
+    assert_eq!(rows[0].item(), "a");
+    assert_eq!(rows[0].estimate(), 10);
+}
+
+#[test]
+fn test_items_custom_type() {
+    let mut sketch: FrequentItemsSketch<TestItem> = 
FrequentItemsSketch::new(8);
+    sketch.update_with_count(TestItem(1), 10);
+    for item in 2..=7 {
+        sketch.update(TestItem(item));
+    }
+    let item = TestItem(8);
+    sketch.update(item);
+
+    assert!(!sketch.is_empty());
+    assert_eq!(sketch.total_weight(), 17);
+    assert_eq!(sketch.estimate(&TestItem(1)), 10);
+
+    let rows = sketch.frequent_items(ErrorType::NoFalsePositives);
+    assert_eq!(rows.len(), 1);
+    assert_eq!(rows[0].item(), &TestItem(1));
+    assert_eq!(rows[0].estimate(), 10);
+}
+
+#[test]
+fn test_longs_merge_estimation_mode() {
+    let mut sketch1: FrequentItemsSketch<i64> = FrequentItemsSketch::new(16);
+    sketch1.update_with_count(1, 9);
+    for item in 2..=14 {
+        sketch1.update(item);
+    }
+    assert!(sketch1.maximum_error() > 0);
+
+    let mut sketch2: FrequentItemsSketch<i64> = FrequentItemsSketch::new(16);
+    for item in 8..=20 {
+        sketch2.update(item);
+    }
+    sketch2.update_with_count(21, 11);
+    assert!(sketch2.maximum_error() > 0);
+
+    sketch1.merge(&sketch2);
+    assert!(!sketch1.is_empty());
+    assert_eq!(sketch1.total_weight(), 46);
+    assert!(sketch1.num_active_items() >= 2);
+
+    let items = 
sketch1.frequent_items_with_threshold(ErrorType::NoFalsePositives, 2);
+    assert_eq!(items.len(), 2);
+    assert_eq!(items[0].item(), &21);
+    assert!(items[0].estimate() >= 11);
+    assert_eq!(items[1].item(), &1);
+    assert!(items[1].estimate() >= 9);
+}
+
+#[test]
+fn test_items_merge_estimation_mode() {
+    let mut sketch1: FrequentItemsSketch<i32> = FrequentItemsSketch::new(16);
+    sketch1.update_with_count(1, 9);
+    for item in 2..=14 {
+        sketch1.update(item);
+    }
+    assert!(sketch1.maximum_error() > 0);
+
+    let mut sketch2: FrequentItemsSketch<i32> = FrequentItemsSketch::new(16);
+    for item in 8..=20 {
+        sketch2.update(item);
+    }
+    sketch2.update_with_count(21, 11);
+    assert!(sketch2.maximum_error() > 0);
+
+    sketch1.merge(&sketch2);
+    assert!(!sketch1.is_empty());
+    assert_eq!(sketch1.total_weight(), 46);
+    assert!(sketch1.num_active_items() >= 2);
+
+    let items = 
sketch1.frequent_items_with_threshold(ErrorType::NoFalsePositives, 2);
+    assert_eq!(items.len(), 2);
+    assert_eq!(items[0].item(), &21);
+    assert!(items[0].estimate() >= 11);
+    assert_eq!(items[1].item(), &1);
+    assert!(items[1].estimate() >= 9);
+}
+
+#[test]
+fn test_longs_merge_exact_mode() {
+    let mut sketch1: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    sketch1.update(1);
+    sketch1.update(2);
+    sketch1.update(2);
+
+    let mut sketch2: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    sketch2.update(2);
+    sketch2.update(3);
+
+    sketch1.merge(&sketch2);
+
+    assert!(!sketch1.is_empty());
+    assert_eq!(sketch1.total_weight(), 5);
+    assert_eq!(sketch1.num_active_items(), 3);
+    assert_eq!(sketch1.estimate(&1), 1);
+    assert_eq!(sketch1.estimate(&2), 3);
+    assert_eq!(sketch1.estimate(&3), 1);
+    assert_eq!(sketch1.maximum_error(), 0);
+}
+
+#[test]
+fn test_items_merge_exact_mode() {
+    let mut sketch1 = FrequentItemsSketch::new(8);
+    let a = "a".to_string();
+    let b = "b".to_string();
+    let c = "c".to_string();
+    sketch1.update(a.clone());
+    sketch1.update(b.clone());
+    sketch1.update(b.clone());
+
+    let mut sketch2 = FrequentItemsSketch::new(8);
+    sketch2.update(b.clone());
+    sketch2.update(c.clone());
+
+    sketch1.merge(&sketch2);
+
+    assert!(!sketch1.is_empty());
+    assert_eq!(sketch1.total_weight(), 5);
+    assert_eq!(sketch1.num_active_items(), 3);
+    assert_eq!(sketch1.estimate(&a), 1);
+    assert_eq!(sketch1.estimate(&b), 3);
+    assert_eq!(sketch1.estimate(&c), 1);
+    assert_eq!(sketch1.maximum_error(), 0);
+}
+
+#[test]
+fn test_longs_merge_empty_is_noop() {
+    let mut sketch: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    sketch.update(1);
+
+    let empty: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    sketch.merge(&empty);
+
+    assert_eq!(sketch.total_weight(), 1);
+    assert_eq!(sketch.num_active_items(), 1);
+    assert_eq!(sketch.estimate(&1), 1);
+}
+
+#[test]
+fn test_items_merge_empty_is_noop() {
+    let mut sketch: FrequentItemsSketch<i32> = FrequentItemsSketch::new(8);
+    sketch.update(1);
+
+    let empty: FrequentItemsSketch<i32> = FrequentItemsSketch::new(8);
+    sketch.merge(&empty);
+
+    assert_eq!(sketch.total_weight(), 1);
+    assert_eq!(sketch.num_active_items(), 1);
+    assert_eq!(sketch.estimate(&1), 1);
+}
+
+#[test]
+fn test_row_equality_changes_with_updates() {
+    let mut sketch: FrequentItemsSketch<i32> = FrequentItemsSketch::new(8);
+    sketch.update(1);
+    let rows1 = sketch.frequent_items(ErrorType::NoFalsePositives);
+    assert_eq!(rows1.len(), 1);
+    let row1 = rows1[0].clone();
+
+    sketch.update(1);
+    let rows2 = sketch.frequent_items(ErrorType::NoFalsePositives);
+    assert_eq!(rows2.len(), 1);
+    let row2 = rows2[0].clone();
+
+    assert_ne!(row1, row2);
+    assert_eq!(row2.item(), &1);
+    assert_eq!(row2.estimate(), 2);
+}
+
+#[test]
+fn test_longs_reset() {
+    let mut sketch: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    sketch.update_with_count(1, 3);
+    sketch.update_with_count(2, 2);
+    sketch.reset();
+
+    assert!(sketch.is_empty());
+    assert_eq!(sketch.total_weight(), 0);
+    assert_eq!(sketch.num_active_items(), 0);
+    assert_eq!(sketch.lg_max_map_size(), 3);
+}
+
+#[test]
+#[should_panic(expected = "count may not be negative")]
+fn test_longs_negative_count_panics() {
+    let mut sketch: FrequentItemsSketch<i64> = FrequentItemsSketch::new(8);
+    sketch.update_with_count(1, -1);
+}
+
+#[test]
+#[should_panic(expected = "count may not be negative")]
+fn test_items_negative_count_panics() {
+    let mut sketch = FrequentItemsSketch::new(8);
+    sketch.update_with_count("a".to_string(), -1);
+}
+
+#[test]
+#[should_panic(expected = "value must be power of 2")]
+fn test_longs_invalid_map_size_panics() {
+    FrequentItemsSketch::<i64>::new(6);
+}
+
+#[test]
+#[should_panic(expected = "value must be power of 2")]
+fn test_items_invalid_map_size_panics() {
+    let _ = FrequentItemsSketch::<String>::new(6);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to