This is an automated email from the ASF dual-hosted git repository.
placave pushed a commit to branch cpc-sketch
in repository https://gitbox.apache.org/repos/asf/datasketches-go.git
The following commit(s) were added to refs/heads/cpc-sketch by this push:
new 9de355f CPCSketch Union init
9de355f is described below
commit 9de355f04fae7b805b436183cd8f7397a73d0f11
Author: Pierre Lacave <[email protected]>
AuthorDate: Fri Aug 9 05:14:44 2024 +0800
CPCSketch Union init
---
cpc/cpc_sketch.go | 67 ++++++--------
cpc/cpc_sketch_test.go | 6 ++
cpc/cpc_union.go | 230 +++++++++++++++++++++++++++++++++++++++++++++++++
cpc/utils.go | 57 ++++++++++++
4 files changed, 317 insertions(+), 43 deletions(-)
diff --git a/cpc/cpc_sketch.go b/cpc/cpc_sketch.go
index 0f631cd..4b294d8 100644
--- a/cpc/cpc_sketch.go
+++ b/cpc/cpc_sketch.go
@@ -52,12 +52,12 @@ type CpcSketch struct {
scratch [8]byte
}
-func NewCpcSketch(lgK int, seed uint64) (*CpcSketch, error) {
+func NewCpcSketch(lgK int, seed uint64) (CpcSketch, error) {
if err := checkLgK(lgK); err != nil {
- return nil, err
+ return CpcSketch{}, err
}
- return &CpcSketch{
+ return CpcSketch{
lgK: lgK,
seed: seed,
kxp: float64(int64(1) << lgK),
@@ -260,46 +260,6 @@ func (c *CpcSketch) promoteSparseToWindowed() {
c.pairTable = newTable
}
-/*
- //In terms of flavor, this promotes SPARSE to HYBRID.
- private static void promoteSparseToWindowed(final CpcSketch sketch) {
- final int lgK = sketch.lgK;
- final int k = (1 << lgK);
- final long c32 = sketch.numCoupons << 5;
- assert ((c32 == (3 * k)) || ((lgK == 4) && (c32 > (3 * k))));
-
- final byte[] window = new byte[k];
-
- final PairTable newTable = new PairTable(2, 6 + lgK);
- final PairTable oldTable = sketch.pairTable;
-
- final int[] oldSlots = oldTable.getSlotsArr();
- final int oldNumSlots = (1 << oldTable.getLgSizeInts());
-
- assert (sketch.windowOffset == 0);
-
- for (int i = 0; i < oldNumSlots; i++) {
- final int rowCol = oldSlots[i];
- if (rowCol != -1) {
- final int col = rowCol & 63;
- if (col < 8) {
- final int row = rowCol >>> 6;
- window[row] |= (1 << col);
- }
- else {
- // cannot use Table.mustInsert(), because it doesn't provide for
growth
- final boolean isNovel = PairTable.maybeInsert(newTable, rowCol);
- assert (isNovel == true);
- }
- }
- }
-
- assert (sketch.slidingWindow == null);
- sketch.slidingWindow = window;
- sketch.pairTable = newTable;
- }
-*/
-
func (c *CpcSketch) reset() {
c.numCoupons = 0
c.mergeFlag = false
@@ -310,3 +270,24 @@ func (c *CpcSketch) reset() {
c.kxp = float64(int64(1) << c.lgK)
c.hipEstAccum = 0
}
+
+func (c *CpcSketch) rowColUpdate(rowCol int) error {
+ col := rowCol & 63
+ if col < c.fiCol {
+ return nil
+ }
+ if c.numCoupons == 0 {
+ err := c.promoteEmptyToSparse()
+ if err != nil {
+ return err
+ }
+ }
+ k := uint64(1) << c.lgK
+ if (c.numCoupons << 5) < (3 * k) {
+ return c.updateSparse(rowCol)
+ } else {
+ // TODO(pierre)
+ // return c.updateWindowed(rowCol)
+ }
+ return nil
+}
diff --git a/cpc/cpc_sketch_test.go b/cpc/cpc_sketch_test.go
index cb0661f..791022b 100644
--- a/cpc/cpc_sketch_test.go
+++ b/cpc/cpc_sketch_test.go
@@ -67,6 +67,12 @@ func TestCPCCheckEstimatesWithMerge(t *testing.T) {
err = sk2.UpdateUint64(uint64(i + n))
assert.NoError(t, err)
}
+ union, err := NewCpcUnionSketchWithDefault(lgk)
+ assert.NoError(t, err)
+ err = union.Update(sk1)
+ assert.NoError(t, err)
+ err = union.Update(sk2)
+ assert.NoError(t, err)
}
/*
diff --git a/cpc/cpc_union.go b/cpc/cpc_union.go
new file mode 100644
index 0000000..4575e78
--- /dev/null
+++ b/cpc/cpc_union.go
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cpc
+
+import (
+ "fmt"
+ "github.com/apache/datasketches-go/internal"
+)
+
+type CpcUnion struct {
+ seed uint64
+ lgK int
+
+ // Note: at most one of bitMatrix and accumulator will be non-null at
any given moment.
+ // accumulator is a sketch object that is employed until it graduates
out of Sparse mode.
+ // At that point, it is converted into a full-sized bitMatrix, which is
mathematically a sketch,
+ // but doesn't maintain any of the "extra" fields of our sketch
objects, so some additional work
+ // is required when getResult is called at the end.
+ bitMatrix []uint64
+ accumulator CpcSketch
+}
+
+func NewCpcUnionSketch(lgK int, seed uint64) (CpcUnion, error) {
+ acc, err := NewCpcSketch(lgK, internal.DEFAULT_UPDATE_SEED)
+ if err != nil {
+ return CpcUnion{}, err
+ }
+ return CpcUnion{
+ seed: seed,
+ lgK: lgK,
+ // We begin with the accumulator holding an EMPTY_MERGED sketch
object.
+ // As an optimization the accumulator could start as NULL, but
that would require changes elsewhere.
+ accumulator: acc,
+ }, nil
+}
+
+func NewCpcUnionSketchWithDefault(lgK int) (CpcUnion, error) {
+ return NewCpcUnionSketch(lgK, internal.DEFAULT_UPDATE_SEED)
+}
+
+func (u *CpcUnion) Update(source CpcSketch) error {
+ if err := checkSeeds(u.seed, source.seed); err != nil {
+ return err
+ }
+
+ sourceFlavorOrd := source.GetFlavor()
+ if sourceFlavorOrd == CpcFlavorEmpty {
+ return nil
+ }
+
+ // Accumulator and bitMatrix must be mutually exclusive,
+ // so bitMatrix != nil => accumulator == nil and visa versa
+ // if (Accumulator != nil) union must be EMPTY or SPARSE,
+ if err := u.checkUnionState(); err != nil {
+ return err
+ }
+
+ if source.lgK < u.lgK {
+ if err := u.reduceUnionK(source.lgK); err != nil {
+ return err
+ }
+ }
+
+ /*
+ if (source == null) { return; }
+ checkSeeds(union.seed, source.seed);
+
+ final int sourceFlavorOrd = source.getFlavor().ordinal();
+ if (sourceFlavorOrd == 0) { return; } //EMPTY
+
+ //Accumulator and bitMatrix must be mutually exclusive,
+ //so bitMatrix != null => accumulator == null and visa versa
+ //if (Accumulator != null) union must be EMPTY or SPARSE,
+ checkUnionState(union);
+
+ if (source.lgK < union.lgK) { reduceUnionK(union,
source.lgK); }
+
+ // if source is past SPARSE mode, make sure that union is a
bitMatrix.
+ if ((sourceFlavorOrd > 1) && (union.accumulator != null)) {
+ union.bitMatrix =
CpcUtil.bitMatrixOfSketch(union.accumulator);
+ union.accumulator = null;
+ }
+
+ final int state = ((sourceFlavorOrd - 1) << 1) |
((union.bitMatrix != null) ? 1 : 0);
+ switch (state) {
+ case 0 : { //A: Sparse, bitMatrix == null, accumulator valid
+ if (union.accumulator == null) {
+ //CodeQL could not figure this out so I have to insert
this.
+ throw new SketchesStateException("union.accumulator can
never be null here.");
+ }
+ if ((union.accumulator.getFlavor() == EMPTY)
+ && (union.lgK == source.lgK)) {
+ union.accumulator = source.copy();
+ break;
+ }
+ walkTableUpdatingSketch(union.accumulator,
source.pairTable);
+ // if the accumulator has graduated beyond sparse, switch
union to a bitMatrix
+ if (union.accumulator.getFlavor().ordinal() > 1) {
+ union.bitMatrix =
CpcUtil.bitMatrixOfSketch(union.accumulator);
+ union.accumulator = null;
+ }
+ break;
+ }
+ case 1 : { //B: Sparse, bitMatrix valid, accumulator == null
+ orTableIntoMatrix(union.bitMatrix, union.lgK,
source.pairTable);
+ break;
+ }
+ case 3 : //C: Hybrid, bitMatrix valid, accumulator == null
+ case 5 : { //C: Pinned, bitMatrix valid, accumulator == null
+ orWindowIntoMatrix(union.bitMatrix, union.lgK,
source.slidingWindow,
+ source.windowOffset, source.lgK);
+ orTableIntoMatrix(union.bitMatrix, union.lgK,
source.pairTable);
+ break;
+ }
+ case 7 : { //D: Sliding, bitMatrix valid, accumulator ==
null
+ // SLIDING mode involves inverted logic, so we can't just
walk the source sketch.
+ // Instead, we convert it to a bitMatrix that can be
OR'ed into the destination.
+ final long[] sourceMatrix =
CpcUtil.bitMatrixOfSketch(source);
+ orMatrixIntoMatrix(union.bitMatrix, union.lgK,
sourceMatrix, source.lgK);
+ break;
+ }
+ default: throw new SketchesStateException("Illegal Union
state: " + state);
+ }
+ */
+ return nil
+}
+
+func (u *CpcUnion) checkUnionState() error {
+ if u == nil {
+ return fmt.Errorf("union cannot be nil")
+ }
+
+ if u.accumulator.lgK != 0 && u.bitMatrix != nil {
+ return fmt.Errorf("accumulator and bitMatrix cannot be both
valid or both nil")
+ }
+ if u.accumulator.lgK != 0 { // not nil
+ if u.accumulator.numCoupons > 0 {
+ if u.accumulator.slidingWindow != nil ||
u.accumulator.pairTable == nil {
+ return fmt.Errorf("Non-empty union accumulator
must be SPARSE")
+ }
+ }
+ if u.lgK != u.accumulator.lgK {
+ return fmt.Errorf("union LgK must equal accumulator
LgK")
+ }
+ }
+ return nil
+}
+
+func (u *CpcUnion) reduceUnionK(newLgK int) error {
+ if newLgK < u.lgK {
+ if u.bitMatrix != nil {
+ // downsample the union's bit matrix
+ newK := 1 << newLgK
+ newMatrix := make([]uint64, newK)
+ orMatrixIntoMatrix(newMatrix, newLgK, u.bitMatrix,
u.lgK)
+ u.bitMatrix = newMatrix
+ u.lgK = newLgK
+ } else {
+ // downsample the union's accumulator
+ oldSketch := u.accumulator
+ if oldSketch.numCoupons == 0 {
+ acc, err := NewCpcSketch(newLgK, oldSketch.seed)
+ if err != nil {
+ return err
+ }
+ u.accumulator = acc
+ u.lgK = newLgK
+ return nil
+ }
+ sk, err := NewCpcSketch(newLgK, oldSketch.seed)
+ if err != nil {
+ return err
+ }
+ newSketch := sk
+ if err := walkTableUpdatingSketch(&newSketch,
oldSketch.pairTable); err != nil {
+ return err
+ }
+ finalNewFlavor := newSketch.GetFlavor()
+ if finalNewFlavor == CpcFlavorSparse {
+ u.accumulator = newSketch
+ u.lgK = newLgK
+ return nil
+ }
+ // the new sketch has graduated beyond sparse, so
convert to bitMatrix
+ //u.accumulator = nil
+ u.bitMatrix = bitMatrixOfSketch(newSketch)
+ u.lgK = newLgK
+ }
+ }
+ return nil
+}
+
+func walkTableUpdatingSketch(dest *CpcSketch, table *pairTable) error {
+ slots := table.slotsArr
+ numSlots := 1 << table.lgSizeInts
+ destMask := ((1<<dest.lgK)-1)<<6 | 63 // downsamples when dest.lgK <
srcLgK
+
+ stride := int(internal.InverseGolden * float64(numSlots))
+ if stride == (stride >> 1 << 1) {
+ stride++
+ }
+
+ for i, j := 0, 0; i < numSlots; i, j = i+1, j+stride {
+ j &= numSlots - 1
+ rowCol := slots[j]
+ if rowCol != -1 {
+ if err := dest.rowColUpdate(rowCol & destMask); err !=
nil {
+ return err
+ }
+ }
+
+ }
+
+ return nil
+}
diff --git a/cpc/utils.go b/cpc/utils.go
index 5897888..0759c5a 100644
--- a/cpc/utils.go
+++ b/cpc/utils.go
@@ -59,6 +59,13 @@ func checkLgSizeInts(lgSizeInts int) error {
return nil
}
+func checkSeeds(seedA uint64, seedB uint64) error {
+ if seedA != seedB {
+ return fmt.Errorf("Incompatible seeds: %d %d", seedA, seedB)
+ }
+ return nil
+}
+
func determineFlavor(lgK int, numCoupons uint64) CpcFlavor {
c := numCoupons
k := uint64(1) << lgK
@@ -79,3 +86,53 @@ func determineFlavor(lgK int, numCoupons uint64) CpcFlavor {
}
return CpcFlavorSliding // 27K/8 <= C
}
+
+func orMatrixIntoMatrix(destMatrix []uint64, destLgK int, srcMatrix []uint64,
srcLgK int) {
+ //assert(destLgK <= srcLgK)
+ destMask := (1 << destLgK) - 1
+ srcK := 1 << srcLgK
+ for srcRow := 0; srcRow < srcK; srcRow++ {
+ destMatrix[srcRow&destMask] |= srcMatrix[srcRow]
+ }
+}
+
+func bitMatrixOfSketch(sketch CpcSketch) []uint64 {
+ k := uint64(1) << sketch.lgK
+ offset := sketch.windowOffset
+ if offset < 0 || offset > 56 {
+ panic("offset < 0 || offset > 56")
+ }
+ matrix := make([]uint64, k)
+ if sketch.numCoupons == 0 {
+ return matrix // Returning a matrix of zeros rather than NULL.
+ }
+ //Fill the matrix with default rows in which the "early zone" is filled
with ones.
+ //This is essential for the routine's O(k) time cost (as opposed to
O(C)).
+ defaultRow := (1 << offset) - 1
+ for i := range matrix {
+ matrix[i] = uint64(defaultRow)
+ }
+ if sketch.slidingWindow != nil { // In other words, we are in window
mode, not sparse mode.
+ for i, v := range sketch.slidingWindow { // set the window
bits, trusting the sketch's current offset.
+ matrix[i] |= (uint64(v) << offset)
+ }
+ }
+ table := sketch.pairTable
+ if table == nil {
+ panic("table == nil")
+ }
+ slots := table.slotsArr
+ numSlots := 1 << table.lgSizeInts
+ for i := 0; i < numSlots; i++ {
+ rowCol := slots[i]
+ if rowCol != -1 {
+ col := rowCol & 63
+ row := rowCol >> 6
+ // Flip the specified matrix bit from its default value.
+ // In the "early" zone the bit changes from 1 to 0.
+ // In the "late" zone the bit changes from 0 to 1.
+ matrix[row] ^= (1 << col)
+ }
+ }
+ return matrix
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]