This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch compression in repository https://gitbox.apache.org/repos/asf/datasketches-rust.git
commit e63fb9b4f3d136ecd08d5b05c7c304dd76d0f29f Author: tison <[email protected]> AuthorDate: Wed Feb 4 17:48:23 2026 +0800 CompressedState continue Signed-off-by: tison <[email protected]> --- datasketches/src/cpc/compression.rs | 196 +++++++++++++++++++++++++++++-- datasketches/src/cpc/compression_data.rs | 193 +++++++++++++++--------------- 2 files changed, 281 insertions(+), 108 deletions(-) diff --git a/datasketches/src/cpc/compression.rs b/datasketches/src/cpc/compression.rs index 4d5cd2a..4eb03ce 100644 --- a/datasketches/src/cpc/compression.rs +++ b/datasketches/src/cpc/compression.rs @@ -15,11 +15,16 @@ // specific language governing permissions and limitations // under the License. -use crate::cpc::compression_data::LENGTH_LIMITED_UNARY_ENCODING_TABLE65; -use crate::cpc::pair_table::{PairTable, introspective_insertion_sort}; -use crate::cpc::{CpcSketch, Flavor}; use std::cmp::Ordering; +use crate::cpc::CpcSketch; +use crate::cpc::Flavor; +use crate::cpc::compression_data::COLUMN_PERMUTATIONS_FOR_ENCODING; +use crate::cpc::compression_data::ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE; +use crate::cpc::compression_data::LENGTH_LIMITED_UNARY_ENCODING_TABLE65; +use crate::cpc::pair_table::PairTable; +use crate::cpc::pair_table::introspective_insertion_sort; + pub(super) struct CompressedState { table_data: Vec<u32>, table_data_words: usize, @@ -34,7 +39,6 @@ impl CompressedState { match source.flavor() { Flavor::EMPTY => { // do nothing - return; } Flavor::SPARSE => { self.compress_sparse_flavor(source); @@ -87,7 +91,7 @@ impl CompressedState { let mut byte = source.sliding_window[row_index]; while byte != 0 { let col_index = byte.trailing_zeros(); - byte = byte ^ (1 << col_index); // erase the 1 + byte ^= 1 << col_index; // erase the 1 all_pairs[idx] = ((row_index << 6) as u32) | col_index; idx += 1; } @@ -117,18 +121,63 @@ impl CompressedState { self.compress_surprising_values(&all_pairs, source.lg_k()); } - fn compress_pinned_flavor(&mut self, source: &CpcSketch) {} + fn compress_pinned_flavor(&mut self, source: &CpcSketch) { + self.compress_sliding_window(&source.sliding_window, source.lg_k(), source.num_coupons()); + let mut pairs = source.surprising_value_table().unwrapping_get_items(); + if !pairs.is_empty() { + // Here we subtract 8 from the column indices. Because they are stored in the low 6 bits + // of each row_col pair, and because no column index is less than 8 for a "Pinned" + // sketch, we can simply subtract 8 from the pairs themselves. + + // shift the columns over by 8 positions before compressing (because of the window) + for pair in &mut pairs { + assert!(*pair & 63 >= 8, "pair column index is less than 8: {pair}"); + *pair -= 8; + } + + introspective_insertion_sort(&mut pairs); + self.compress_surprising_values(&pairs, source.lg_k()); + } + } - fn compress_sliding_flavor(&mut self, source: &CpcSketch) {} + // Complicated by the existence of both a left fringe and a right fringe. + fn compress_sliding_flavor(&mut self, source: &CpcSketch) { + self.compress_sliding_window(&source.sliding_window, source.lg_k(), source.num_coupons()); + let mut pairs = source.surprising_value_table().unwrapping_get_items(); + if !pairs.is_empty() { + // Here we apply a complicated transformation to the column indices, which + // changes the implied ordering of the pairs, so we must do it before sorting. + + let pseudo_phase = determine_pseudo_phase(source.lg_k(), source.num_coupons()); + let permutation = COLUMN_PERMUTATIONS_FOR_ENCODING[pseudo_phase as usize]; + let offset = source.window_offset; + debug_assert!(offset <= 56); + for pair in &mut pairs { + let row_col = *pair; + let row = row_col >> 6; + let mut col = (row_col & 63) as u8; + // first rotate the columns into a canonical configuration: + // new = ((old - (offset+8)) + 64) mod 64 + col = (col + 56 - offset) & 63; + debug_assert!(col < 56); + // then apply the permutation + col = permutation[col as usize]; + *pair = (row << 6) | (col as u32); + } + + introspective_insertion_sort(&mut pairs); + self.compress_surprising_values(&pairs, source.lg_k()); + } + } fn compress_surprising_values(&mut self, pairs: &[u32], lg_k: u8) { let k = 1 << lg_k; let num_pairs = pairs.len() as u32; let num_base_bits = golomb_choose_number_of_base_bits(k + num_pairs, num_pairs as u64); let table_len = safe_length_for_compressed_pair_buf(k, num_pairs, num_base_bits); - self.table_data.truncate(table_len); + self.table_data.resize(table_len, 0); - let compressed_surprising_values = self.low_level_compress_pairs(&pairs, num_base_bits); + let compressed_surprising_values = self.low_level_compress_pairs(pairs, num_base_bits); // At this point we could free the unused portion of the compression output buffer, // but it is not necessary if it is temporary @@ -138,6 +187,81 @@ impl CompressedState { self.table_num_entries = num_pairs; } + fn compress_sliding_window(&mut self, window: &[u8], lg_k: u8, num_coupons: u32) { + let k = 1 << lg_k; + let window_buf_len = safe_length_for_compressed_window_buf(k); + self.window_data.resize(window_buf_len, 0); + let pseudo_phase = determine_pseudo_phase(lg_k, num_coupons); + let data_words = self.low_level_compress_bytes( + window, + k, + &ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE[pseudo_phase as usize], + ); + + // At this point we could free the unused portion of the compression output buffer, + // but it is not necessary if it is temporary + // Note: realloc caused strange timing spikes for lgK = 11 and 12. + + self.window_data_words = data_words; + } + + /// Returns the number of compressed words that were actually used. + /// + /// It is the caller's responsibility to ensure that the window_data is long enough. + fn low_level_compress_bytes( + &mut self, + byte_array: &[u8], + num_bytes_to_encode: u32, + encoding_table: &[u16], + ) -> usize { + // bits are packed into this first, then are flushed to window_data + let mut bitbuf = 0; + // number of bits currently in bitbuf; must be between 0 and 31 + let mut bufbits = 0; + let mut next_word_index = 0; + + for byte_index in 0..num_bytes_to_encode { + let code_info = encoding_table[byte_array[byte_index as usize] as usize]; + let code_val = (code_info & 0xfff) as u64; + let code_len = (code_info >> 12) as u8; + bitbuf |= code_val << bufbits; + bufbits += code_len; + maybe_flush_bitbuf( + &mut bitbuf, + &mut bufbits, + &mut self.window_data, + &mut next_word_index, + ); + } + + // Pad the bitstream with 11 zero-bits so that the decompressor's 12-bit peek can't overrun + // its input. + bufbits += 11; + maybe_flush_bitbuf( + &mut bitbuf, + &mut bufbits, + &mut self.window_data, + &mut next_word_index, + ); + + if bufbits > 0 { + // We are done encoding now, so we flush the bit buffer. + debug_assert!(bufbits < 32); + self.window_data[next_word_index] = (bitbuf & 0xffffffff) as u32; + next_word_index += 1; + + // not really necessary unset since no more use + //bitbuf = 0; + //bufbits = 0; + } + + next_word_index + } + + /// Returns the number of table_data actually used. + /// + /// Here "pairs" refers to row/column pairs that specify the positions of surprising values in + /// the bit matrix. fn low_level_compress_pairs(&mut self, pairs: &[u32], num_base_bits: u8) -> usize { let mut bitbuf = 0; let mut bufbits = 0; @@ -165,9 +289,9 @@ impl CompressedState { predicted_col_index = col_index + 1; let code_info = LENGTH_LIMITED_UNARY_ENCODING_TABLE65[x_delta as usize]; - let code_val = code_info & 0xfff; + let code_val = (code_info & 0xfff) as u64; let code_len = (code_info >> 12) as u8; - bitbuf |= (code_val << bufbits) as u64; + bitbuf |= code_val << bufbits; bufbits += code_len; maybe_flush_bitbuf( @@ -213,7 +337,7 @@ impl CompressedState { self.table_data[next_word_index] = (bitbuf & 0xffffffff) as u32; next_word_index += 1; - // not really necessary unset + // not really necessary unset since no more use //bitbuf = 0; //bufbits = 0; } @@ -227,6 +351,42 @@ pub(super) struct UncompressedState { window: Vec<u8>, } +fn determine_pseudo_phase(lg_k: u8, num_coupons: u32) -> u8 { + let k = 1 << lg_k; + // This mid-range logic produces pseudo-phases. They are used to select encoding tables. + // The thresholds were chosen by hand after looking at plots of measured compression. + if 1000 * num_coupons < 2375 * k { + if 4 * num_coupons < 3 * k { + // mid-range table + 16 + } else if 10 * num_coupons < 11 * k { + // mid-range table + 16 + 1 + } else if 100 * num_coupons < 132 * k { + // mid-range table + 16 + 2 + } else if 3 * num_coupons < 5 * k { + // mid-range table + 16 + 3 + } else if 1000 * num_coupons < 1965 * k { + // mid-range table + 16 + 4 + } else if 1000 * num_coupons < 2275 * k { + // mid-range table + 16 + 5 + } else { + // steady-state table employed before its actual phase + 6 + } + } else { + // This steady-state logic produces true phases. They are used to select + // encoding tables, and also column permutations for the "Sliding" flavor. + debug_assert!(lg_k >= 4); + let tmp = num_coupons >> (lg_k - 4); + (tmp & 15) as u8 // phase + } +} + fn write_unary( compressed_words: &mut [u32], next_word_index: &mut usize, @@ -265,6 +425,18 @@ fn maybe_flush_bitbuf( } } +// Explanation of padding: we write +// 1) xdelta (huffman, provides at least 1 bit, requires 12-bit lookahead) +// 2) ydeltaGolombHi (unary, provides at least 1 bit, requires 8-bit lookahead) +// 3) ydeltaGolombLo (straight B bits). +// So the 12-bit lookahead is the tight constraint, but there are at least (2 + B) bits emitted, +// so we would be safe with max (0, 10 - B) bits of padding at the end of the bitstream. +fn safe_length_for_compressed_window_buf(k: u32) -> usize { + // 11 bits of padding, due to 12-bit lookahead, with 1 bit certainly present. + let bits = 12 * k + 11; + divide_longs_rounding_up(bits as usize, 32) +} + fn safe_length_for_compressed_pair_buf(k: u32, num_pairs: u32, num_base_bits: u8) -> usize { // Long ybits = k + numPairs; // simpler and safer UB // The following tighter UB on ybits is based on page 198 diff --git a/datasketches/src/cpc/compression_data.rs b/datasketches/src/cpc/compression_data.rs index a8900ba..3c8c464 100644 --- a/datasketches/src/cpc/compression_data.rs +++ b/datasketches/src/cpc/compression_data.rs @@ -15,102 +15,6 @@ // specific language governing permissions and limitations // under the License. -#[cfg(test)] -mod tests { - use super::*; - use googletest::assert_that; - use googletest::prelude::container_eq; - - #[test] - fn decoding_tables() { - let length_limited_unary_decoding_table65 = - make_decoding_table(&LENGTH_LIMITED_UNARY_ENCODING_TABLE65, 65); - validate_decoding_table( - &length_limited_unary_decoding_table65, - &LENGTH_LIMITED_UNARY_ENCODING_TABLE65, - ); - assert_that!( - length_limited_unary_decoding_table65, - container_eq(LENGTH_LIMITED_UNARY_DECODING_TABLE65), - ); - - let mut decoding_tables_for_high_entropy_byte = vec![]; - for i in 0..(16 + 6) { - decoding_tables_for_high_entropy_byte.push(make_decoding_table( - &ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE[i], - 256, - )); - validate_decoding_table( - &decoding_tables_for_high_entropy_byte[i], - &ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE[i], - ); - } - assert_that!( - decoding_tables_for_high_entropy_byte, - container_eq(DECODING_TABLES_FOR_HIGH_ENTROPY_BYTE), - ); - - let mut column_permutations_for_decoding = vec![]; - for i in 0..16 { - column_permutations_for_decoding.push(make_inverse_permutation( - &COLUMN_PERMUTATIONS_FOR_ENCODING[i], - 56, - )); - } - assert_that!( - column_permutations_for_decoding, - container_eq(COLUMN_PERMUTATIONS_FOR_DECODING), - ); - } - - fn make_decoding_table(encoding_table: &[u16], num_byte_values: u16) -> Vec<u16> { - assert_eq!(encoding_table.len(), num_byte_values as usize); - let mut decoding_table = vec![0; 4096]; - for byte_value in 0..num_byte_values { - let encoding_entry = encoding_table[byte_value as usize]; - let code_value = encoding_entry & 0xfff; - let code_length = encoding_entry >> 12; - let decoding_entry = (code_length << 8) | byte_value; - let garbage_length = 12 - code_length; - let num_copies = 1 << garbage_length; - for garbage_bits in 0..num_copies { - let extended_code_value = code_value | (garbage_bits << code_length); - decoding_table[(extended_code_value & 0xfff) as usize] = decoding_entry; - } - } - decoding_table - } - - fn make_inverse_permutation(perm: &[u8], len: usize) -> Vec<u8> { - let mut inverse = vec![0; len]; - for i in 0..len { - inverse[perm[i] as usize] = i as u8; - } - for i in 0..len { - assert_eq!(perm[inverse[i] as usize], i as u8); - } - inverse - } - - fn validate_decoding_table(decoding_table: &[u16], encoding_table: &[u16]) { - for decode_this in 0..4096 { - let tmp_d = decoding_table[decode_this]; - let decoded_byte = tmp_d & 0xff; - let decoded_length = tmp_d >> 8; - - let tmp_e = encoding_table[decoded_byte as usize]; - let encoded_bit_pattern = tmp_e & 0xfff; - let encoded_length = tmp_e >> 12; - - assert_eq!(decoded_length, encoded_length); - assert_eq!( - encoded_bit_pattern, - (decode_this as u16) & ((1 << decoded_length) - 1) - ); - } - } -} - /// Notice that there are only 65 symbols here, which is different from our usual 8->12 coding /// scheme which handles 256 symbols. pub(super) static LENGTH_LIMITED_UNARY_ENCODING_TABLE65: [u16; 65] = [ @@ -12055,3 +11959,100 @@ pub(super) static DECODING_TABLES_FOR_HIGH_ENTROPY_BYTE: [[u16; 4096]; 22] = [ 515, 1029, 769, 1555, 515, 1289, 775, 2598, 515, 1282, 769, 1831, 515, 1295, 775, 3327, ], ]; + +#[cfg(test)] +mod tests { + use googletest::assert_that; + use googletest::prelude::container_eq; + + use super::*; + + #[test] + fn decoding_tables() { + let length_limited_unary_decoding_table65 = + make_decoding_table(&LENGTH_LIMITED_UNARY_ENCODING_TABLE65, 65); + validate_decoding_table( + &length_limited_unary_decoding_table65, + &LENGTH_LIMITED_UNARY_ENCODING_TABLE65, + ); + assert_that!( + length_limited_unary_decoding_table65, + container_eq(LENGTH_LIMITED_UNARY_DECODING_TABLE65), + ); + + let mut decoding_tables_for_high_entropy_byte = vec![]; + for i in 0..(16 + 6) { + decoding_tables_for_high_entropy_byte.push(make_decoding_table( + &ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE[i], + 256, + )); + validate_decoding_table( + &decoding_tables_for_high_entropy_byte[i], + &ENCODING_TABLES_FOR_HIGH_ENTROPY_BYTE[i], + ); + } + assert_that!( + decoding_tables_for_high_entropy_byte, + container_eq(DECODING_TABLES_FOR_HIGH_ENTROPY_BYTE), + ); + + let mut column_permutations_for_decoding = vec![]; + for i in 0..16 { + column_permutations_for_decoding.push(make_inverse_permutation( + &COLUMN_PERMUTATIONS_FOR_ENCODING[i], + 56, + )); + } + assert_that!( + column_permutations_for_decoding, + container_eq(COLUMN_PERMUTATIONS_FOR_DECODING), + ); + } + + fn make_decoding_table(encoding_table: &[u16], num_byte_values: u16) -> Vec<u16> { + assert_eq!(encoding_table.len(), num_byte_values as usize); + let mut decoding_table = vec![0; 4096]; + for byte_value in 0..num_byte_values { + let encoding_entry = encoding_table[byte_value as usize]; + let code_value = encoding_entry & 0xfff; + let code_length = encoding_entry >> 12; + let decoding_entry = (code_length << 8) | byte_value; + let garbage_length = 12 - code_length; + let num_copies = 1 << garbage_length; + for garbage_bits in 0..num_copies { + let extended_code_value = code_value | (garbage_bits << code_length); + decoding_table[(extended_code_value & 0xfff) as usize] = decoding_entry; + } + } + decoding_table + } + + fn make_inverse_permutation(perm: &[u8], len: usize) -> Vec<u8> { + let mut inverse = vec![0; len]; + for i in 0..len { + inverse[perm[i] as usize] = i as u8; + } + for i in 0..len { + assert_eq!(perm[inverse[i] as usize], i as u8); + } + inverse + } + + fn validate_decoding_table(decoding_table: &[u16], encoding_table: &[u16]) { + for decode_this in 0..4096 { + let tmp_d = decoding_table[decode_this]; + let decoded_byte = tmp_d & 0xff; + let decoded_length = tmp_d >> 8; + + let tmp_e = encoding_table[decoded_byte as usize]; + let encoded_bit_pattern = tmp_e & 0xfff; + let encoded_length = tmp_e >> 12; + + assert_eq!(decoded_length, encoded_length); + assert_eq!( + encoded_bit_pattern, + (decode_this as u16) & ((1 << decoded_length) - 1) + ); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
