This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch compressed_iterator
in repository https://gitbox.apache.org/repos/asf/datasketches-java.git

commit adff89079a8439ce18ab373a34ab1730a8aa8f07
Author: AlexanderSaydakov <[email protected]>
AuthorDate: Sun Jan 26 13:26:35 2025 -0800

    use iteration in set ops, wrap compressed sketch and unpack in iterator
---
 .../org/apache/datasketches/theta/AnotBimpl.java   | 34 ++++++++++---
 .../datasketches/theta/CompactOperations.java      |  2 +-
 .../apache/datasketches/theta/CompactSketch.java   | 18 +++----
 .../datasketches/theta/DirectCompactSketch.java    | 12 ++---
 .../datasketches/theta/IntersectionImpl.java       | 59 +++++++++++++---------
 .../apache/datasketches/theta/PreambleUtil.java    |  3 ++
 .../java/org/apache/datasketches/theta/Sketch.java | 13 ++---
 .../org/apache/datasketches/theta/UnionImpl.java   | 55 ++++----------------
 8 files changed, 92 insertions(+), 104 deletions(-)

diff --git a/src/main/java/org/apache/datasketches/theta/AnotBimpl.java 
b/src/main/java/org/apache/datasketches/theta/AnotBimpl.java
index e7b2c99e..6d4c54fd 100644
--- a/src/main/java/org/apache/datasketches/theta/AnotBimpl.java
+++ b/src/main/java/org/apache/datasketches/theta/AnotBimpl.java
@@ -20,8 +20,11 @@
 package org.apache.datasketches.theta;
 
 import static org.apache.datasketches.common.Util.exactLog2OfLong;
-import static 
org.apache.datasketches.thetacommon.HashOperations.convertToHashTable;
+import static 
org.apache.datasketches.thetacommon.HashOperations.checkThetaCorruption;
+import static 
org.apache.datasketches.thetacommon.HashOperations.continueCondition;
 import static org.apache.datasketches.thetacommon.HashOperations.hashSearch;
+import static 
org.apache.datasketches.thetacommon.HashOperations.hashSearchOrInsert;
+import static 
org.apache.datasketches.thetacommon.HashOperations.minLgHashTableSize;
 
 import java.util.Arrays;
 
@@ -124,7 +127,7 @@ final class AnotBimpl extends AnotB {
 
     if (skB.isEmpty()) {
       return skA.compact(dstOrdered, dstMem);
-   }
+    }
     ThetaUtil.checkSeedHashes(skB.getSeedHash(), seedHash_);
     //Both skA & skB are not empty
 
@@ -162,14 +165,12 @@ final class AnotBimpl extends AnotB {
       final long[] hashArrA,
       final Sketch skB) {
 
-    //Rebuild/get hashtable of skB
+    // Rebuild or get hashtable of skB
     final long[] hashTableB; //read only
-    final long[] thetaCache = skB.getCache();
-    final int countB = skB.getRetainedEntries(true);
     if (skB instanceof CompactSketch) {
-      hashTableB = convertToHashTable(thetaCache, countB, minThetaLong, 
ThetaUtil.REBUILD_THRESHOLD);
+      hashTableB = convertToHashTable(skB, minThetaLong, 
ThetaUtil.REBUILD_THRESHOLD);
     } else {
-      hashTableB = thetaCache;
+      hashTableB = skB.getCache();
     }
 
     //build temporary result arrays of skA
@@ -191,6 +192,25 @@ final class AnotBimpl extends AnotB {
     return Arrays.copyOfRange(tmpHashArrA, 0, nonMatches);
   }
 
+  private static long[] convertToHashTable(
+      final Sketch sketch,
+      final long thetaLong,
+      final double rebuildThreshold) {
+    final int lgArrLongs = minLgHashTableSize(sketch.getRetainedEntries(true), 
rebuildThreshold);
+    final int arrLongs = 1 << lgArrLongs;
+    final long[] hashTable = new long[arrLongs];
+    checkThetaCorruption(thetaLong);
+    HashIterator it = sketch.iterator();
+    while (it.next()) {
+      final long hash = it.get();
+      if (continueCondition(thetaLong, hash) ) {
+        continue;
+      }
+      hashSearchOrInsert(hashTable, lgArrLongs, hash);
+    }
+    return hashTable;
+  }
+
   private void reset() {
     thetaLong_ = Long.MAX_VALUE;
     empty_ = true;
diff --git a/src/main/java/org/apache/datasketches/theta/CompactOperations.java 
b/src/main/java/org/apache/datasketches/theta/CompactOperations.java
index a8066314..2b52f59f 100644
--- a/src/main/java/org/apache/datasketches/theta/CompactOperations.java
+++ b/src/main/java/org/apache/datasketches/theta/CompactOperations.java
@@ -161,7 +161,7 @@ final class CompactOperations {
       final long hash = srcMem.getLong(srcPreLongs << 3);
       final SingleItemSketch sis = new SingleItemSketch(hash, srcSeedHash);
       if (dstMem != null) {
-        dstMem.putByteArray(0, sis.toByteArray(),0, 16);
+        dstMem.putByteArray(0, sis.toByteArray(), 0, 16);
         return new DirectCompactSketch(dstMem);
       } else { //heap
         return sis;
diff --git a/src/main/java/org/apache/datasketches/theta/CompactSketch.java 
b/src/main/java/org/apache/datasketches/theta/CompactSketch.java
index 1426368f..688ad274 100644
--- a/src/main/java/org/apache/datasketches/theta/CompactSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/CompactSketch.java
@@ -32,6 +32,7 @@ import static 
org.apache.datasketches.theta.PreambleUtil.extractSerVer;
 import static org.apache.datasketches.theta.PreambleUtil.extractEntryBitsV4;
 import static 
org.apache.datasketches.theta.PreambleUtil.extractNumEntriesBytesV4;
 import static org.apache.datasketches.theta.PreambleUtil.extractThetaLongV4;
+import static org.apache.datasketches.theta.PreambleUtil.wholeBytesToHoldBits;
 import static 
org.apache.datasketches.theta.SingleItemSketch.otherCheckForSingleItem;
 
 import org.apache.datasketches.common.Family;
@@ -189,7 +190,8 @@ public abstract class CompactSketch extends Sketch {
     if (serVer == 4) {
       // not wrapping the compressed format since currently we cannot take 
advantage of
       // decompression during iteration because set operations reach into 
memory directly
-      return heapifyV4(srcMem, seed, enforceSeed);
+      return DirectCompactCompressedSketch.wrapInstance(srcMem,
+          enforceSeed ? seedHash : (short) extractSeedHash(srcMem));
     }
     else if (serVer == 3) {
       if (PreambleUtil.isEmptyFlag(srcMem)) {
@@ -274,10 +276,6 @@ public abstract class CompactSketch extends Sketch {
     return Long.numberOfLeadingZeros(ored);
   }
 
-  private static int wholeBytesToHoldBits(final int bits) {
-    return (bits >>> 3) + ((bits & 7) > 0 ? 1 : 0);
-  }
-
   private byte[] toByteArrayV4() {
     final int preambleLongs = isEstimationMode() ? 2 : 1;
     final int entryBits = 64 - computeMinLeadingZeros();
@@ -286,8 +284,8 @@ public abstract class CompactSketch extends Sketch {
     // store num_entries as whole bytes since whole-byte blocks will follow 
(most probably)
     final int numEntriesBytes = wholeBytesToHoldBits(32 - 
Integer.numberOfLeadingZeros(getRetainedEntries()));
 
-    final int size = preambleLongs * Long.BYTES + numEntriesBytes + 
wholeBytesToHoldBits(compressedBits);
-    final byte[] bytes = new byte[size];
+    final int sizeBytes = preambleLongs * Long.BYTES + numEntriesBytes + 
wholeBytesToHoldBits(compressedBits);
+    final byte[] bytes = new byte[sizeBytes];
     final WritableMemory mem = WritableMemory.writableWrap(bytes);
     int offsetBytes = 0;
     mem.putByte(offsetBytes++, (byte) preambleLongs);
@@ -334,12 +332,10 @@ public abstract class CompactSketch extends Sketch {
 
   private static CompactSketch heapifyV4(final Memory srcMem, final long seed, 
final boolean enforceSeed) {
     final int preLongs = extractPreLongs(srcMem);
-    final int flags = extractFlags(srcMem);
     final int entryBits = extractEntryBitsV4(srcMem);
     final int numEntriesBytes = extractNumEntriesBytesV4(srcMem);
     final short seedHash = (short) extractSeedHash(srcMem);
-    final boolean isEmpty = (flags & EMPTY_FLAG_MASK) > 0;
-    if (enforceSeed && !isEmpty) { PreambleUtil.checkMemorySeedHash(srcMem, 
seed); }
+    if (enforceSeed) { PreambleUtil.checkMemorySeedHash(srcMem, seed); }
     int offsetBytes = 8;
     long theta = Long.MAX_VALUE;
     if (preLongs > 1) {
@@ -374,7 +370,7 @@ public abstract class CompactSketch extends Sketch {
       entries[i] += previous;
       previous = entries[i];
     }
-    return new HeapCompactSketch(entries, isEmpty, seedHash, numEntries, 
theta, true);
+    return new HeapCompactSketch(entries, false, seedHash, numEntries, theta, 
true);
   }
 
 }
diff --git 
a/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java 
b/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java
index 0f69ec3c..1714d216 100644
--- a/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java
+++ b/src/main/java/org/apache/datasketches/theta/DirectCompactSketch.java
@@ -86,11 +86,7 @@ class DirectCompactSketch extends CompactSketch {
 
   @Override
   public double getEstimate() {
-    if (otherCheckForSingleItem(mem_)) { return 1; }
-    final int preLongs = extractPreLongs(mem_);
-    final int curCount = (preLongs == 1) ? 0 : extractCurCount(mem_);
-    final long thetaLong = (preLongs > 2) ? extractThetaLong(mem_) : 
Long.MAX_VALUE;
-    return Sketch.estimate(thetaLong, curCount);
+    return Sketch.estimate(getThetaLong(), getRetainedEntries());
   }
 
   @Override
@@ -142,10 +138,8 @@ class DirectCompactSketch extends CompactSketch {
 
   @Override
   public byte[] toByteArray() {
-    final int curCount = getRetainedEntries(true);
-    checkIllegalCurCountAndEmpty(isEmpty(), curCount);
-    final int preLongs = extractPreLongs(mem_);
-    final int outBytes = (curCount + preLongs) << 3;
+    checkIllegalCurCountAndEmpty(isEmpty(), getRetainedEntries());
+    final int outBytes = getCurrentBytes();
     final byte[] byteArrOut = new byte[outBytes];
     mem_.getByteArray(0, byteArrOut, 0, outBytes);
     return byteArrOut;
diff --git a/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java 
b/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java
index fc81d112..3404a0b1 100644
--- a/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java
+++ b/src/main/java/org/apache/datasketches/theta/IntersectionImpl.java
@@ -288,7 +288,7 @@ class IntersectionImpl extends Intersection {
       else { //On the heap, allocate a HT
         hashTable_ = new long[1 << lgArrLongs_];
       }
-      moveDataToTgt(sketchIn.getCache(), curCount_);
+      moveDataToTgt(sketchIn);
     } //end of state 5
 
     //state 7
@@ -434,8 +434,6 @@ class IntersectionImpl extends Intersection {
   private void performIntersect(final Sketch sketchIn) {
     // curCount and input data are nonzero, match against HT
     assert curCount_ > 0 && !empty_;
-    final long[] cacheIn = sketchIn.getCache();
-    final int arrLongsIn = cacheIn.length;
     final long[] hashTable;
     if (wmem_ != null) {
       final int htLen = 1 << lgArrLongs_;
@@ -448,27 +446,16 @@ class IntersectionImpl extends Intersection {
     final long[] matchSet = new long[ min(curCount_, 
sketchIn.getRetainedEntries(true)) ];
 
     int matchSetCount = 0;
-    if (sketchIn.isOrdered()) {
-      //ordered compact, which enables early stop
-      for (int i = 0; i < arrLongsIn; i++ ) {
-        final long hashIn = cacheIn[i];
-        //if (hashIn <= 0L) continue;  //<= 0 should not happen
-        if (hashIn >= thetaLong_) {
-          break; //early stop assumes that hashes in input sketch are ordered!
-        }
+    HashIterator it = sketchIn.iterator();
+    while (it.next()) {
+      final long hashIn = it.get();
+      if (hashIn < thetaLong_) {
         final int foundIdx = hashSearch(hashTable, lgArrLongs_, hashIn);
-        if (foundIdx == -1) { continue; }
-        matchSet[matchSetCount++] = hashIn;
-      }
-    }
-    else {
-      //either unordered compact or hash table
-      for (int i = 0; i < arrLongsIn; i++ ) {
-        final long hashIn = cacheIn[i];
-        if (hashIn <= 0L || hashIn >= thetaLong_) { continue; }
-        final int foundIdx = hashSearch(hashTable, lgArrLongs_, hashIn);
-        if (foundIdx == -1) { continue; }
-        matchSet[matchSetCount++] = hashIn;
+        if (foundIdx != -1) {
+          matchSet[matchSetCount++] = hashIn;
+        }
+      } else {
+        if (sketchIn.isOrdered()) { break; } // early stop
       }
     }
     //reduce effective array size to minimum
@@ -515,6 +502,32 @@ class IntersectionImpl extends Intersection {
     assert tmpCnt == count : "Intersection Count Check: got: " + tmpCnt + ", 
expected: " + count;
   }
 
+  private void moveDataToTgt(final Sketch sketch) {
+    int count = sketch.getRetainedEntries();
+    int tmpCnt = 0;
+    if (wmem_ != null) { //Off Heap puts directly into mem
+      final int preBytes = CONST_PREAMBLE_LONGS << 3;
+      final int lgArrLongs = lgArrLongs_;
+      final long thetaLong = thetaLong_;
+      HashIterator it = sketch.iterator();
+      while (it.next()) {
+        final long hash = it.get();
+        if (continueCondition(thetaLong, hash)) { continue; }
+        hashInsertOnlyMemory(wmem_, lgArrLongs, hash, preBytes);
+        tmpCnt++;
+      }
+    } else { //On Heap. Assumes HT exists and is large enough
+      HashIterator it = sketch.iterator();
+      while (it.next()) {
+        final long hash = it.get();
+        if (continueCondition(thetaLong_, hash)) { continue; }
+        hashInsertOnly(hashTable_, lgArrLongs_, hash);
+        tmpCnt++;
+      }
+    }
+    assert tmpCnt == count : "Intersection Count Check: got: " + tmpCnt + ", 
expected: " + count;
+  }
+
   private void hardReset() {
     resetCommon();
     if (wmem_ != null) {
diff --git a/src/main/java/org/apache/datasketches/theta/PreambleUtil.java 
b/src/main/java/org/apache/datasketches/theta/PreambleUtil.java
index e1d9262e..ec0bc126 100644
--- a/src/main/java/org/apache/datasketches/theta/PreambleUtil.java
+++ b/src/main/java/org/apache/datasketches/theta/PreambleUtil.java
@@ -524,4 +524,7 @@ final class PreambleUtil {
         + ", Required: " + required);
   }
 
+  static int wholeBytesToHoldBits(final int bits) {
+    return (bits >>> 3) + ((bits & 7) > 0 ? 1 : 0);
+  }
 }
diff --git a/src/main/java/org/apache/datasketches/theta/Sketch.java 
b/src/main/java/org/apache/datasketches/theta/Sketch.java
index 89618bc2..6583e2db 100644
--- a/src/main/java/org/apache/datasketches/theta/Sketch.java
+++ b/src/main/java/org/apache/datasketches/theta/Sketch.java
@@ -451,9 +451,8 @@ public abstract class Sketch implements MemoryStatus {
       final boolean hexMode) {
     final StringBuilder sb = new StringBuilder();
 
-    final long[] cache = getCache();
     int nomLongs = 0;
-    int arrLongs = cache.length;
+    int arrLongs = 0;
     float p = 0;
     int rf = 0;
     final boolean updateSketch = this instanceof UpdateSketch;
@@ -473,12 +472,10 @@ public abstract class Sketch implements MemoryStatus {
       final int w = width > 0 ? width : 8; // default is 8 wide
       if (curCount > 0) {
         sb.append("### SKETCH DATA DETAIL");
-        for (int i = 0, j = 0; i < arrLongs; i++ ) {
-          final long h;
-          h = cache[i];
-          if (h <= 0 || h >= thetaLong) {
-            continue;
-          }
+        HashIterator it = iterator();
+        int j = 0;
+        while (it.next()) {
+          final long h = it.get();
           if (j % w == 0) {
             sb.append(LS).append(String.format("   %6d", j + 1));
           }
diff --git a/src/main/java/org/apache/datasketches/theta/UnionImpl.java 
b/src/main/java/org/apache/datasketches/theta/UnionImpl.java
index bac05de7..4fee6a9c 100644
--- a/src/main/java/org/apache/datasketches/theta/UnionImpl.java
+++ b/src/main/java/org/apache/datasketches/theta/UnionImpl.java
@@ -320,46 +320,17 @@ final class UnionImpl extends Union {
     }
     //sketchIn is valid and not empty
     ThetaUtil.checkSeedHashes(expectedSeedHash_, sketchIn.getSeedHash());
-    if (sketchIn instanceof SingleItemSketch) {
-      gadget_.hashUpdate(sketchIn.getCache()[0]);
-      return;
-    }
     Sketch.checkSketchAndMemoryFlags(sketchIn);
 
     unionThetaLong_ = min(min(unionThetaLong_, sketchIn.getThetaLong()), 
gadget_.getThetaLong()); //Theta rule
     unionEmpty_ = false;
-    final int curCountIn = sketchIn.getRetainedEntries(true);
-    if (curCountIn > 0) {
-      if (sketchIn.isOrdered() && (sketchIn instanceof CompactSketch)) { //Use 
early stop
-        //Ordered, thus compact
-        if (sketchIn.hasMemory()) {
-          final Memory skMem = sketchIn.getMemory();
-          final int preambleLongs = skMem.getByte(PREAMBLE_LONGS_BYTE) & 0X3F;
-          for (int i = 0; i < curCountIn; i++ ) {
-            final int offsetBytes = preambleLongs + i << 3;
-            final long hashIn = skMem.getLong(offsetBytes);
-            if (hashIn >= unionThetaLong_) { break; } // "early stop"
-            gadget_.hashUpdate(hashIn); //backdoor update, hash function is 
bypassed
-          }
-        }
-        else { //sketchIn is on the Java Heap or has array
-          final long[] cacheIn = sketchIn.getCache(); //not a copy!
-          for (int i = 0; i < curCountIn; i++ ) {
-            final long hashIn = cacheIn[i];
-            if (hashIn >= unionThetaLong_) { break; } // "early stop"
-            gadget_.hashUpdate(hashIn); //backdoor update, hash function is 
bypassed
-          }
-        }
-      } //End ordered, compact
-      else { //either not-ordered compact or Hash Table form. A HT may have 
dirty values.
-        final long[] cacheIn = sketchIn.getCache(); //if off-heap this will be 
a copy
-        final int arrLongs = cacheIn.length;
-        for (int i = 0, c = 0; i < arrLongs && c < curCountIn; i++ ) {
-          final long hashIn = cacheIn[i];
-          if (hashIn <= 0L || hashIn >= unionThetaLong_) { continue; } 
//rejects dirty values
-          gadget_.hashUpdate(hashIn); //backdoor update, hash function is 
bypassed
-          c++; //ensures against invalid state inside the incoming sketch
-        }
+    HashIterator it = sketchIn.iterator();
+    while (it.next()) {
+      final long hash = it.get();
+      if (hash < unionThetaLong_ && hash < gadget_.getThetaLong()) {
+        gadget_.hashUpdate(hash); // backdoor update, hash function is bypassed
+      } else {
+        if (sketchIn.isOrdered()) { break; }
       }
     }
     unionThetaLong_ = min(unionThetaLong_, gadget_.getThetaLong()); //Theta 
rule with gadget
@@ -379,11 +350,8 @@ final class UnionImpl extends Union {
     final int fam = extractFamilyID(skMem);
 
     if (serVer == 4) { // compressed ordered compact
-      // performance can be improved by decompression while performing the 
union
-      // potentially only partial decompression might be needed
       ThetaUtil.checkSeedHashes(expectedSeedHash_, (short) 
extractSeedHash(skMem));
-      final CompactSketch csk = CompactSketch.wrap(skMem);
-      union(csk);
+      union(CompactSketch.wrap(skMem));
       return;
     }
     if (serVer == 3) { //The OpenSource sketches (Aug 4, 2015) starts with 
serVer = 3
@@ -396,16 +364,13 @@ final class UnionImpl extends Union {
     }
     if (serVer == 2) { //older Sketch, which is compact and ordered
       ThetaUtil.checkSeedHashes(expectedSeedHash_, 
(short)extractSeedHash(skMem));
-      final CompactSketch csk = ForwardCompatibility.heapify2to3(skMem, 
expectedSeedHash_);
-      union(csk);
+      union(ForwardCompatibility.heapify2to3(skMem, expectedSeedHash_));
       return;
     }
     if (serVer == 1) { //much older Sketch, which is compact and ordered, no 
seedHash
-      final CompactSketch csk = ForwardCompatibility.heapify1to3(skMem, 
expectedSeedHash_);
-      union(csk);
+      union(ForwardCompatibility.heapify1to3(skMem, expectedSeedHash_));
       return;
     }
-
     throw new SketchesArgumentException("SerVer is unknown: " + serVer);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to