[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16179304#comment-16179304 ]
ASF GitHub Bot commented on FLINK-7465: --------------------------------------- Github user jparkie commented on a diff in the pull request: https://github.com/apache/flink/pull/4652#discussion_r140833539 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/functions/aggfunctions/cardinality/HyperLogLog.java --- @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggfunctions.cardinality; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.Serializable; + +/** + * Java implementation of HyperLogLog (HLL) algorithm from this paper: + * <p/> + * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf + * <p/> + * HLL is an improved version of LogLog that is capable of estimating + * the cardinality of a set with accuracy = 1.04/sqrt(m) where + * m = 2^b. So we can control accuracy vs space usage by increasing + * or decreasing b. + * <p/> + * The main benefit of using HLL over LL is that it only requires 64% + * of the space that LL does to get the same accuracy. + * <p/> + * <p> + * Note that this implementation does not include the long range correction function + * defined in the original paper. Empirical evidence shows that the correction + * function causes more harm than good. + * </p> + */ +public class HyperLogLog implements ICardinality, Serializable { + + private final RegisterSet registerSet; + private final int log2m; + private final double alphaMM; + + + /** + * Create a new HyperLogLog instance using the specified standard deviation. + * + * @param rsd - the relative standard deviation for the counter. + * smaller values create counters that require more space. + */ + public HyperLogLog(double rsd) { + this(log2m(rsd)); + } + + private static int log2m(double rsd) { + return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2)); + } + + private static double rsd(int log2m) { + return 1.106 / Math.sqrt(Math.exp(log2m * Math.log(2))); + } + + private static void validateLog2m(int log2m) { + if (log2m < 0 || log2m > 30) { + throw new IllegalArgumentException("log2m argument is " + + log2m + " and is outside the range [0, 30]"); + } + } + + private static double linearCounting(int m, double v) { + return m * Math.log(m / v); + } + + /** + * Create a new HyperLogLog instance. The log2m parameter defines the accuracy of + * the counter. The larger the log2m the better the accuracy. + * <p/> + * accuracy = 1.04/sqrt(2^log2m) + * + * @param log2m - the number of bits to use as the basis for the HLL instance + */ + public HyperLogLog(int log2m) { + this(log2m, new RegisterSet(1 << log2m)); + } + + /** + * Creates a new HyperLogLog instance using the given registers. Used for unmarshalling a serialized + * instance and for merging multiple counters together. + * + * @param registerSet - the initial values for the register set + */ + public HyperLogLog(int log2m, RegisterSet registerSet) { + validateLog2m(log2m); + this.registerSet = registerSet; + this.log2m = log2m; + int m = 1 << this.log2m; + + alphaMM = getAlphaMM(log2m, m); + } + + @Override + public boolean offerHashed(long hashedValue) { + // j becomes the binary address determined by the first b log2m of x + // j will be between 0 and 2^log2m + final int j = (int) (hashedValue >>> (Long.SIZE - log2m)); + final int r = Long.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1; + return registerSet.updateIfGreater(j, r); + } + + @Override + public boolean offerHashed(int hashedValue) { + // j becomes the binary address determined by the first b log2m of x + // j will be between 0 and 2^log2m + final int j = hashedValue >>> (Integer.SIZE - log2m); + final int r = Integer.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1; + return registerSet.updateIfGreater(j, r); + } + + @Override + public boolean offer(Object o) { + final int x = MurmurHash.hash(o); + return offerHashed(x); + } + + @Override + public long cardinality() { + double registerSum = 0; + int count = registerSet.count; + double zeros = 0.0; + for (int j = 0; j < registerSet.count; j++) { + int val = registerSet.get(j); + registerSum += 1.0 / (1 << val); + if (val == 0) { + zeros++; + } + } + + double estimate = alphaMM * (1 / registerSum); + + if (estimate <= (5.0 / 2.0) * count) { + // Small Range Estimate + return Math.round(linearCounting(count, zeros)); + } else { + return Math.round(estimate); + } + } + + @Override + public int sizeof() { + return registerSet.size * 4; + } + + @Override + public byte[] getBytes() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput dos = new DataOutputStream(baos); + writeBytes(dos); + + return baos.toByteArray(); + } + + private void writeBytes(DataOutput serializedByteStream) throws IOException { + serializedByteStream.writeInt(log2m); + serializedByteStream.writeInt(registerSet.size * 4); + for (int x : registerSet.readOnlyBits()) { + serializedByteStream.writeInt(x); + } + } + + /** + * Add all the elements of the other set to this set. + * <p/> + * This operation does not imply a loss of precision. + * + * @param other A compatible Hyperloglog instance (same log2m) + * @throws Exception if other is not compatible + */ + public void addAll(HyperLogLog other) throws Exception { + if (this.sizeof() != other.sizeof()) { + throw new Exception("Cannot merge estimators of different sizes"); + } + + registerSet.merge(other.registerSet); + } + + @Override + public ICardinality merge(ICardinality... estimators) throws Exception { + HyperLogLog merged = new HyperLogLog(log2m, new RegisterSet(this.registerSet.count)); + merged.addAll(this); + + if (estimators == null) { + return merged; + } + + for (ICardinality estimator : estimators) { + if (!(estimator instanceof HyperLogLog)) { + throw new Exception("Cannot merge estimators of different class"); + } + HyperLogLog hll = (HyperLogLog) estimator; + merged.addAll(hll); + } + + return merged; + } + + private Object writeReplace() { + return new SerializationHolder(this); + } + + /** + * This class exists to support Externalizable semantics for + * HyperLogLog objects without having to expose a public + * constructor, public write/read methods, or pretend final + * fields aren't final. + * + * <p> + * In short, Externalizable allows you to skip some of the more + * verbose meta-data default Serializable gets you, but still + * includes the class name. In that sense, there is some cost + * to this holder object because it has a longer class name. I + * imagine people who care about optimizing for that have their + * own work-around for long class names in general, or just use + * a custom serialization framework. Therefore we make no attempt + * to optimize that here (eg. by raising this from an inner class + * and giving it an unhelpful name). + * </p> + */ + private static class SerializationHolder implements Externalizable { + + HyperLogLog hyperLogLogHolder; + + public SerializationHolder(HyperLogLog hyperLogLogHolder) { + this.hyperLogLogHolder = hyperLogLogHolder; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + hyperLogLogHolder.writeBytes(out); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + hyperLogLogHolder = Builder.build(in); + } + + private Object readResolve() { + return hyperLogLogHolder; + } + } + + /** + * Build a HyperLogLog instance. + */ + public static class Builder implements Serializable { --- End diff -- Do we really need a builder when there are only one modifiable parameter? Why pay for an instance of a Builder when you can have normal static factory methods? We can argue the JIT may make it irrelevant, but it's hard to be definitive about the JIT. > Add build-in BloomFilterCount on TableAPI&SQL > --------------------------------------------- > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: sunjincheng > Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)