This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch compression
in repository https://gitbox.apache.org/repos/asf/datasketches-rust.git

commit e63fb9b4f3d136ecd08d5b05c7c304dd76d0f29f
Author: tison <[email protected]>
AuthorDate: Wed Feb 4 17:48:23 2026 +0800

    CompressedState continue
    
    Signed-off-by: tison <[email protected]>
---
 datasketches/src/cpc/compression.rs      | 196 +++++++++++++++++++++++++++++--
 datasketches/src/cpc/compression_data.rs | 193 +++++++++++++++---------------
 2 files changed, 281 insertions(+), 108 deletions(-)

diff --git a/datasketches/src/cpc/compression.rs 
b/datasketches/src/cpc/compression.rs
index 4d5cd2a..4eb03ce 100644
--- a/datasketches/src/cpc/compression.rs
+++ b/datasketches/src/cpc/compression.rs
@@ -15,11 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::cpc::compression_data::LENGTH_LIMITED_UNARY_ENCODING_TABLE65;
-use crate::cpc::pair_table::{PairTable, introspective_insertion_sort};
-use crate::cpc::{CpcSketch, Flavor};
 use std::cmp::Ordering;
 
+use crate::cpc::CpcSketch;
+use crate::cpc::Flavor;
+use crate::cpc::compression_data::COLUMN_PERMUTATIONS_FOR_ENCODING;
+use crate::cpc::compression_data::ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE;
+use crate::cpc::compression_data::LENGTH_LIMITED_UNARY_ENCODING_TABLE65;
+use crate::cpc::pair_table::PairTable;
+use crate::cpc::pair_table::introspective_insertion_sort;
+
 pub(super) struct CompressedState {
     table_data: Vec<u32>,
     table_data_words: usize,
@@ -34,7 +39,6 @@ impl CompressedState {
         match source.flavor() {
             Flavor::EMPTY => {
                 // do nothing
-                return;
             }
             Flavor::SPARSE => {
                 self.compress_sparse_flavor(source);
@@ -87,7 +91,7 @@ impl CompressedState {
                 let mut byte = source.sliding_window[row_index];
                 while byte != 0 {
                     let col_index = byte.trailing_zeros();
-                    byte = byte ^ (1 << col_index); // erase the 1
+                    byte ^= 1 << col_index; // erase the 1
                     all_pairs[idx] = ((row_index << 6) as u32) | col_index;
                     idx += 1;
                 }
@@ -117,18 +121,63 @@ impl CompressedState {
         self.compress_surprising_values(&all_pairs, source.lg_k());
     }
 
-    fn compress_pinned_flavor(&mut self, source: &CpcSketch) {}
+    fn compress_pinned_flavor(&mut self, source: &CpcSketch) {
+        self.compress_sliding_window(&source.sliding_window, source.lg_k(), 
source.num_coupons());
+        let mut pairs = source.surprising_value_table().unwrapping_get_items();
+        if !pairs.is_empty() {
+            // Here we subtract 8 from the column indices. Because they are 
stored in the low 6 bits
+            // of each row_col pair, and because no column index is less than 
8 for a "Pinned"
+            // sketch, we can simply subtract 8 from the pairs themselves.
+
+            // shift the columns over by 8 positions before compressing 
(because of the window)
+            for pair in &mut pairs {
+                assert!(*pair & 63 >= 8, "pair column index is less than 8: 
{pair}");
+                *pair -= 8;
+            }
+
+            introspective_insertion_sort(&mut pairs);
+            self.compress_surprising_values(&pairs, source.lg_k());
+        }
+    }
 
-    fn compress_sliding_flavor(&mut self, source: &CpcSketch) {}
+    // Complicated by the existence of both a left fringe and a right fringe.
+    fn compress_sliding_flavor(&mut self, source: &CpcSketch) {
+        self.compress_sliding_window(&source.sliding_window, source.lg_k(), 
source.num_coupons());
+        let mut pairs = source.surprising_value_table().unwrapping_get_items();
+        if !pairs.is_empty() {
+            // Here we apply a complicated transformation to the column 
indices, which
+            // changes the implied ordering of the pairs, so we must do it 
before sorting.
+
+            let pseudo_phase = determine_pseudo_phase(source.lg_k(), 
source.num_coupons());
+            let permutation = COLUMN_PERMUTATIONS_FOR_ENCODING[pseudo_phase as 
usize];
+            let offset = source.window_offset;
+            debug_assert!(offset <= 56);
+            for pair in &mut pairs {
+                let row_col = *pair;
+                let row = row_col >> 6;
+                let mut col = (row_col & 63) as u8;
+                // first rotate the columns into a canonical configuration:
+                //  new = ((old - (offset+8)) + 64) mod 64
+                col = (col + 56 - offset) & 63;
+                debug_assert!(col < 56);
+                // then apply the permutation
+                col = permutation[col as usize];
+                *pair = (row << 6) | (col as u32);
+            }
+
+            introspective_insertion_sort(&mut pairs);
+            self.compress_surprising_values(&pairs, source.lg_k());
+        }
+    }
 
     fn compress_surprising_values(&mut self, pairs: &[u32], lg_k: u8) {
         let k = 1 << lg_k;
         let num_pairs = pairs.len() as u32;
         let num_base_bits = golomb_choose_number_of_base_bits(k + num_pairs, 
num_pairs as u64);
         let table_len = safe_length_for_compressed_pair_buf(k, num_pairs, 
num_base_bits);
-        self.table_data.truncate(table_len);
+        self.table_data.resize(table_len, 0);
 
-        let compressed_surprising_values = 
self.low_level_compress_pairs(&pairs, num_base_bits);
+        let compressed_surprising_values = 
self.low_level_compress_pairs(pairs, num_base_bits);
 
         // At this point we could free the unused portion of the compression 
output buffer,
         // but it is not necessary if it is temporary
@@ -138,6 +187,81 @@ impl CompressedState {
         self.table_num_entries = num_pairs;
     }
 
+    fn compress_sliding_window(&mut self, window: &[u8], lg_k: u8, 
num_coupons: u32) {
+        let k = 1 << lg_k;
+        let window_buf_len = safe_length_for_compressed_window_buf(k);
+        self.window_data.resize(window_buf_len, 0);
+        let pseudo_phase = determine_pseudo_phase(lg_k, num_coupons);
+        let data_words = self.low_level_compress_bytes(
+            window,
+            k,
+            &ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE[pseudo_phase as usize],
+        );
+
+        // At this point we could free the unused portion of the compression 
output buffer,
+        // but it is not necessary if it is temporary
+        // Note: realloc caused strange timing spikes for lgK = 11 and 12.
+
+        self.window_data_words = data_words;
+    }
+
+    /// Returns the number of compressed words that were actually used.
+    ///
+    /// It is the caller's responsibility to ensure that the window_data is 
long enough.
+    fn low_level_compress_bytes(
+        &mut self,
+        byte_array: &[u8],
+        num_bytes_to_encode: u32,
+        encoding_table: &[u16],
+    ) -> usize {
+        // bits are packed into this first, then are flushed to window_data
+        let mut bitbuf = 0;
+        // number of bits currently in bitbuf; must be between 0 and 31
+        let mut bufbits = 0;
+        let mut next_word_index = 0;
+
+        for byte_index in 0..num_bytes_to_encode {
+            let code_info = encoding_table[byte_array[byte_index as usize] as 
usize];
+            let code_val = (code_info & 0xfff) as u64;
+            let code_len = (code_info >> 12) as u8;
+            bitbuf |= code_val << bufbits;
+            bufbits += code_len;
+            maybe_flush_bitbuf(
+                &mut bitbuf,
+                &mut bufbits,
+                &mut self.window_data,
+                &mut next_word_index,
+            );
+        }
+
+        // Pad the bitstream with 11 zero-bits so that the decompressor's 
12-bit peek can't overrun
+        // its input.
+        bufbits += 11;
+        maybe_flush_bitbuf(
+            &mut bitbuf,
+            &mut bufbits,
+            &mut self.window_data,
+            &mut next_word_index,
+        );
+
+        if bufbits > 0 {
+            // We are done encoding now, so we flush the bit buffer.
+            debug_assert!(bufbits < 32);
+            self.window_data[next_word_index] = (bitbuf & 0xffffffff) as u32;
+            next_word_index += 1;
+
+            // not really necessary unset since no more use
+            //bitbuf = 0;
+            //bufbits = 0;
+        }
+
+        next_word_index
+    }
+
+    /// Returns the number of table_data actually used.
+    ///
+    /// Here "pairs" refers to row/column pairs that specify the positions of 
surprising values in
+    /// the bit matrix.
     fn low_level_compress_pairs(&mut self, pairs: &[u32], num_base_bits: u8) 
-> usize {
         let mut bitbuf = 0;
         let mut bufbits = 0;
@@ -165,9 +289,9 @@ impl CompressedState {
             predicted_col_index = col_index + 1;
 
             let code_info = LENGTH_LIMITED_UNARY_ENCODING_TABLE65[x_delta as 
usize];
-            let code_val = code_info & 0xfff;
+            let code_val = (code_info & 0xfff) as u64;
             let code_len = (code_info >> 12) as u8;
-            bitbuf |= (code_val << bufbits) as u64;
+            bitbuf |= code_val << bufbits;
             bufbits += code_len;
 
             maybe_flush_bitbuf(
@@ -213,7 +337,7 @@ impl CompressedState {
             self.table_data[next_word_index] = (bitbuf & 0xffffffff) as u32;
             next_word_index += 1;
 
-            // not really necessary unset
+            // not really necessary unset since no more use
             //bitbuf = 0;
             //bufbits = 0;
         }
@@ -227,6 +351,42 @@ pub(super) struct UncompressedState {
     window: Vec<u8>,
 }
 
+fn determine_pseudo_phase(lg_k: u8, num_coupons: u32) -> u8 {
+    let k = 1 << lg_k;
+    // This mid-range logic produces pseudo-phases. They are used to select 
encoding tables.
+    // The thresholds were chosen by hand after looking at plots of measured 
compression.
+    if 1000 * num_coupons < 2375 * k {
+        if 4 * num_coupons < 3 * k {
+            // mid-range table
+            16
+        } else if 10 * num_coupons < 11 * k {
+            // mid-range table
+            16 + 1
+        } else if 100 * num_coupons < 132 * k {
+            // mid-range table
+            16 + 2
+        } else if 3 * num_coupons < 5 * k {
+            // mid-range table
+            16 + 3
+        } else if 1000 * num_coupons < 1965 * k {
+            // mid-range table
+            16 + 4
+        } else if 1000 * num_coupons < 2275 * k {
+            // mid-range table
+            16 + 5
+        } else {
+            // steady-state table employed before its actual phase
+            6
+        }
+    } else {
+        // This steady-state logic produces true phases. They are used to 
select
+        // encoding tables, and also column permutations for the "Sliding" 
flavor.
+        debug_assert!(lg_k >= 4);
+        let tmp = num_coupons >> (lg_k - 4);
+        (tmp & 15) as u8 // phase
+    }
+}
+
 fn write_unary(
     compressed_words: &mut [u32],
     next_word_index: &mut usize,
@@ -265,6 +425,18 @@ fn maybe_flush_bitbuf(
     }
 }
 
+// Explanation of padding: we write
+// 1) xdelta (huffman, provides at least 1 bit, requires 12-bit lookahead)
+// 2) ydeltaGolombHi (unary, provides at least 1 bit, requires 8-bit lookahead)
+// 3) ydeltaGolombLo (straight B bits).
+// So the 12-bit lookahead is the tight constraint, but there are at least (2 
+ B) bits emitted,
+// so we would be safe with max (0, 10 - B) bits of padding at the end of the 
bitstream.
+fn safe_length_for_compressed_window_buf(k: u32) -> usize {
+    // 11 bits of padding, due to 12-bit lookahead, with 1 bit certainly 
present.
+    let bits = 12 * k + 11;
+    divide_longs_rounding_up(bits as usize, 32)
+}
+
 fn safe_length_for_compressed_pair_buf(k: u32, num_pairs: u32, num_base_bits: 
u8) -> usize {
     // Long ybits = k + numPairs; // simpler and safer UB
     // The following tighter UB on ybits is based on page 198
diff --git a/datasketches/src/cpc/compression_data.rs 
b/datasketches/src/cpc/compression_data.rs
index a8900ba..3c8c464 100644
--- a/datasketches/src/cpc/compression_data.rs
+++ b/datasketches/src/cpc/compression_data.rs
@@ -15,102 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use googletest::assert_that;
-    use googletest::prelude::container_eq;
-
-    #[test]
-    fn decoding_tables() {
-        let length_limited_unary_decoding_table65 =
-            make_decoding_table(&LENGTH_LIMITED_UNARY_ENCODING_TABLE65, 65);
-        validate_decoding_table(
-            &length_limited_unary_decoding_table65,
-            &LENGTH_LIMITED_UNARY_ENCODING_TABLE65,
-        );
-        assert_that!(
-            length_limited_unary_decoding_table65,
-            container_eq(LENGTH_LIMITED_UNARY_DECODING_TABLE65),
-        );
-
-        let mut decoding_tables_for_high_entropy_byte = vec![];
-        for i in 0..(16 + 6) {
-            decoding_tables_for_high_entropy_byte.push(make_decoding_table(
-                &ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE[i],
-                256,
-            ));
-            validate_decoding_table(
-                &decoding_tables_for_high_entropy_byte[i],
-                &ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE[i],
-            );
-        }
-        assert_that!(
-            decoding_tables_for_high_entropy_byte,
-            container_eq(DECODING_TABLES_FOR_HIGH_ENTROPY_BYTE),
-        );
-
-        let mut column_permutations_for_decoding = vec![];
-        for i in 0..16 {
-            column_permutations_for_decoding.push(make_inverse_permutation(
-                &COLUMN_PERMUTATIONS_FOR_ENCODING[i],
-                56,
-            ));
-        }
-        assert_that!(
-            column_permutations_for_decoding,
-            container_eq(COLUMN_PERMUTATIONS_FOR_DECODING),
-        );
-    }
-
-    fn make_decoding_table(encoding_table: &[u16], num_byte_values: u16) -> 
Vec<u16> {
-        assert_eq!(encoding_table.len(), num_byte_values as usize);
-        let mut decoding_table = vec![0; 4096];
-        for byte_value in 0..num_byte_values {
-            let encoding_entry = encoding_table[byte_value as usize];
-            let code_value = encoding_entry & 0xfff;
-            let code_length = encoding_entry >> 12;
-            let decoding_entry = (code_length << 8) | byte_value;
-            let garbage_length = 12 - code_length;
-            let num_copies = 1 << garbage_length;
-            for garbage_bits in 0..num_copies {
-                let extended_code_value = code_value | (garbage_bits << 
code_length);
-                decoding_table[(extended_code_value & 0xfff) as usize] = 
decoding_entry;
-            }
-        }
-        decoding_table
-    }
-
-    fn make_inverse_permutation(perm: &[u8], len: usize) -> Vec<u8> {
-        let mut inverse = vec![0; len];
-        for i in 0..len {
-            inverse[perm[i] as usize] = i as u8;
-        }
-        for i in 0..len {
-            assert_eq!(perm[inverse[i] as usize], i as u8);
-        }
-        inverse
-    }
-
-    fn validate_decoding_table(decoding_table: &[u16], encoding_table: &[u16]) 
{
-        for decode_this in 0..4096 {
-            let tmp_d = decoding_table[decode_this];
-            let decoded_byte = tmp_d & 0xff;
-            let decoded_length = tmp_d >> 8;
-
-            let tmp_e = encoding_table[decoded_byte as usize];
-            let encoded_bit_pattern = tmp_e & 0xfff;
-            let encoded_length = tmp_e >> 12;
-
-            assert_eq!(decoded_length, encoded_length);
-            assert_eq!(
-                encoded_bit_pattern,
-                (decode_this as u16) & ((1 << decoded_length) - 1)
-            );
-        }
-    }
-}
-
 /// Notice that there are only 65 symbols here, which is different from our 
usual 8->12 coding
 /// scheme which handles 256 symbols.
 pub(super) static LENGTH_LIMITED_UNARY_ENCODING_TABLE65: [u16; 65] = [
@@ -12055,3 +11959,100 @@ pub(super) static 
DECODING_TABLES_FOR_HIGH_ENTROPY_BYTE: [[u16; 4096]; 22] = [
         515, 1029, 769, 1555, 515, 1289, 775, 2598, 515, 1282, 769, 1831, 515, 
1295, 775, 3327,
     ],
 ];
+
+#[cfg(test)]
+mod tests {
+    use googletest::assert_that;
+    use googletest::prelude::container_eq;
+
+    use super::*;
+
+    #[test]
+    fn decoding_tables() {
+        let length_limited_unary_decoding_table65 =
+            make_decoding_table(&LENGTH_LIMITED_UNARY_ENCODING_TABLE65, 65);
+        validate_decoding_table(
+            &length_limited_unary_decoding_table65,
+            &LENGTH_LIMITED_UNARY_ENCODING_TABLE65,
+        );
+        assert_that!(
+            length_limited_unary_decoding_table65,
+            container_eq(LENGTH_LIMITED_UNARY_DECODING_TABLE65),
+        );
+
+        let mut decoding_tables_for_high_entropy_byte = vec![];
+        for i in 0..(16 + 6) {
+            decoding_tables_for_high_entropy_byte.push(make_decoding_table(
+                &ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE[i],
+                256,
+            ));
+            validate_decoding_table(
+                &decoding_tables_for_high_entropy_byte[i],
+                &ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE[i],
+            );
+        }
+        assert_that!(
+            decoding_tables_for_high_entropy_byte,
+            container_eq(DECODING_TABLES_FOR_HIGH_ENTROPY_BYTE),
+        );
+
+        let mut column_permutations_for_decoding = vec![];
+        for i in 0..16 {
+            column_permutations_for_decoding.push(make_inverse_permutation(
+                &COLUMN_PERMUTATIONS_FOR_ENCODING[i],
+                56,
+            ));
+        }
+        assert_that!(
+            column_permutations_for_decoding,
+            container_eq(COLUMN_PERMUTATIONS_FOR_DECODING),
+        );
+    }
+
+    fn make_decoding_table(encoding_table: &[u16], num_byte_values: u16) -> 
Vec<u16> {
+        assert_eq!(encoding_table.len(), num_byte_values as usize);
+        let mut decoding_table = vec![0; 4096];
+        for byte_value in 0..num_byte_values {
+            let encoding_entry = encoding_table[byte_value as usize];
+            let code_value = encoding_entry & 0xfff;
+            let code_length = encoding_entry >> 12;
+            let decoding_entry = (code_length << 8) | byte_value;
+            let garbage_length = 12 - code_length;
+            let num_copies = 1 << garbage_length;
+            for garbage_bits in 0..num_copies {
+                let extended_code_value = code_value | (garbage_bits << 
code_length);
+                decoding_table[(extended_code_value & 0xfff) as usize] = 
decoding_entry;
+            }
+        }
+        decoding_table
+    }
+
+    fn make_inverse_permutation(perm: &[u8], len: usize) -> Vec<u8> {
+        let mut inverse = vec![0; len];
+        for i in 0..len {
+            inverse[perm[i] as usize] = i as u8;
+        }
+        for i in 0..len {
+            assert_eq!(perm[inverse[i] as usize], i as u8);
+        }
+        inverse
+    }
+
+    fn validate_decoding_table(decoding_table: &[u16], encoding_table: &[u16]) 
{
+        for decode_this in 0..4096 {
+            let tmp_d = decoding_table[decode_this];
+            let decoded_byte = tmp_d & 0xff;
+            let decoded_length = tmp_d >> 8;
+
+            let tmp_e = encoding_table[decoded_byte as usize];
+            let encoded_bit_pattern = tmp_e & 0xfff;
+            let encoded_length = tmp_e >> 12;
+
+            assert_eq!(decoded_length, encoded_length);
+            assert_eq!(
+                encoded_bit_pattern,
+                (decode_this as u16) & ((1 << decoded_length) - 1)
+            );
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to