This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new 4b396ab KYLIN-4751 Fix NPE issue when run test case TestTopNUDAF
4b396ab is described below
commit 4b396ab86152a7f72aecb65e69f87441394d7373
Author: Zhichao Zhang <[email protected]>
AuthorDate: Thu Sep 10 09:51:38 2020 +0800
KYLIN-4751 Fix NPE issue when run test case TestTopNUDAF
---
.../org/apache/kylin/measure/topn/Counter.java | 34 ++++++--
.../org/apache/kylin/measure/topn/TopNCounter.java | 94 +++++++++++++++-------
.../apache/kylin/measure/topn/TopNMeasureType.java | 22 ++---
.../kylin/measure/topn/TopNCounterBasicTest.java | 4 +-
.../sql_topn/{query45.sql.disable => query45.sql} | 2 +-
.../src/test/resources/query/sql_topn/query81.sql | 4 +-
.../kylin/engine/spark/job/TestTopNUDAF.scala | 3 +-
7 files changed, 107 insertions(+), 56 deletions(-)
diff --git
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
index 219c712..42422df 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/Counter.java
@@ -18,6 +18,10 @@
package org.apache.kylin.measure.topn;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.io.Serializable;
/**
@@ -25,10 +29,10 @@ import java.io.Serializable;
*
* @param <T>
*/
-public class Counter<T> implements Serializable{
+public class Counter<T> implements Externalizable, Serializable {
protected T item;
- protected double count;
+ protected Double count;
/**
* For de-serialization
@@ -37,30 +41,46 @@ public class Counter<T> implements Serializable{
}
public Counter(T item) {
- this.count = 0;
+ this.count = 0d;
this.item = item;
}
- public Counter(T item, double count) {
+ public Counter(T item, Double count) {
this.item = item;
this.count = count;
}
-
public T getItem() {
return item;
}
- public double getCount() {
+ public Double getCount() {
return count;
}
- public void setCount(double count) {
+ public void setItem(T item) {
+ this.item = item;
+ }
+
+ public void setCount(Double count) {
this.count = count;
}
+
@Override
public String toString() {
return item + ":" + count;
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ item = (T) in.readObject();
+ count = in.readDouble();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(item);
+ out.writeDouble(count);
+ }
}
diff --git
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index 979d591..2352bbd 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -19,7 +19,6 @@
package org.apache.kylin.measure.topn;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
@@ -46,7 +45,7 @@ public class TopNCounter<T> implements Iterable<Counter<T>>,
java.io.Serializabl
protected int capacity;
private HashMap<T, Counter<T>> counterMap;
- protected LinkedList<Counter<T>> counterList; //a linked list, first the
is the toppest element
+ protected LinkedList<Counter<T>> counterList; //a linked list, first one
is the toppest element
private boolean ordered = true;
private boolean descending = true;
@@ -75,17 +74,30 @@ public class TopNCounter<T> implements
Iterable<Counter<T>>, java.io.Serializabl
* Algorithm: <i>Space-Saving</i>
*
* @param item stream element (<i>e</i>)
- * @return false if item was already in the stream summary, true otherwise
*/
- public void offer(T item, double incrementCount) {
+ public void offer(T item, Double incrementCount) {
Counter<T> counterNode = counterMap.get(item);
+
if (counterNode == null) {
- counterNode = new Counter<T>(item, incrementCount);
+ if (size() < capacity) {
+ counterNode = new Counter<>(item, null);
+ if (this.descending) {
+ counterList.addLast(counterNode);
+ } else {
+ counterList.addFirst(counterNode);
+ }
+ } else {
+ // the min item should be dropped
+ if (!ordered) {
+ sort();
+ }
+ counterNode = this.descending ? counterList.getLast() :
counterList.getFirst();
+ counterMap.remove(counterNode.getItem());
+ counterNode.setItem(item);
+ }
counterMap.put(item, counterNode);
- counterList.add(counterNode);
- } else {
- counterNode.setCount(counterNode.getCount() + incrementCount);
}
+ incrementCounter(counterNode, incrementCount);
ordered = false;
}
@@ -93,9 +105,8 @@ public class TopNCounter<T> implements Iterable<Counter<T>>,
java.io.Serializabl
* Sort and keep the expected size;
*/
public void sortAndRetain() {
- Collections.sort(counterList, this.descending ? DESC_COMPARATOR :
ASC_COMPARATOR);
+ sort();
retain(capacity);
- ordered = true;
}
public List<Counter<T>> topK(int k) {
@@ -143,7 +154,7 @@ public class TopNCounter<T> implements
Iterable<Counter<T>>, java.io.Serializabl
* @param item
* @param count
*/
- public void offerToHead(T item, double count) {
+ public void offerToHead(T item, Double count) {
Counter<T> c = new Counter<T>(item, count);
counterList.addFirst(c);
counterMap.put(c.item, c);
@@ -160,26 +171,20 @@ public class TopNCounter<T> implements
Iterable<Counter<T>>, java.io.Serializabl
double m1 = thisFull ? this.counterList.getLast().count : 0.0;
double m2 = anotherFull ? another.counterList.getLast().count : 0.0;
- if (anotherFull == true) {
+ if (anotherFull) {
for (Counter<T> entry : this.counterMap.values()) {
entry.count += m2;
}
}
for (Map.Entry<T, Counter<T>> entry : another.counterMap.entrySet()) {
- Counter<T> counter = this.counterMap.get(entry.getKey());
- if (counter != null) {
- // this.offer(entry.getValue().getItem(),
(entry.getValue().count - m2));
- counter.setCount(counter.getCount() + (entry.getValue().count
- m2));
+ if (this.counterMap.containsKey(entry.getKey())) {
+ this.offer(entry.getValue().getItem(), (entry.getValue().count
- m2));
} else {
- // this.offer(entry.getValue().getItem(),
entry.getValue().count + m1);
- counter = new Counter<T>(entry.getValue().getItem(),
entry.getValue().count + m1);
- this.counterMap.put(entry.getValue().getItem(), counter);
- this.counterList.add(counter);
+ this.offer(entry.getValue().getItem(), entry.getValue().count
+ m1);
}
}
this.ordered = false;
-
this.sortAndRetain();
return this;
}
@@ -226,33 +231,60 @@ public class TopNCounter<T> implements
Iterable<Counter<T>>, java.io.Serializabl
public TopNCounter<T> copy() {
TopNCounter result = new TopNCounter(capacity);
result.counterMap = Maps.newHashMap(counterMap);
+ result.counterList = Lists.newLinkedList(counterList);
return result;
}
@Override
public Iterator<Counter<T>> iterator() {
- if (this.descending == true) {
+ if (this.descending) {
return this.counterList.descendingIterator();
} else {
throw new IllegalStateException(); // support in future
}
}
- static final Comparator ASC_COMPARATOR = new Comparator<Counter>() {
- @Override
- public int compare(Counter o1, Counter o2) {
- return Double.compare(o1.getCount(), o2.getCount());
+ static final Comparator<Counter> ASC_COMPARATOR = (Counter o1, Counter o2)
-> {
+ if (o1.getCount() == null) {
+ if (o2.getCount() == null)
+ return 0;
+ else
+ return -1;
}
+ if (o2.getCount() == null) {
+ return 1;
+ }
+ return Double.compare(o1.getCount(), o2.getCount());
+ };
+ static final Comparator<Counter> DESC_COMPARATOR = (Counter o1, Counter
o2) -> {
+ if (o1.getCount() == null) {
+ if (o2.getCount() == null)
+ return 0;
+ else
+ return 1;
+ }
+ if (o2.getCount() == null) {
+ return -1;
+ }
+ return Double.compare(o2.getCount(), o1.getCount());
};
- static final Comparator DESC_COMPARATOR = new Comparator<Counter>() {
- @Override
- public int compare(Counter o1, Counter o2) {
- return Double.compare(o2.getCount(), o1.getCount());
+ private void incrementCounter(Counter<T> counterNode, Double
incrementCount) {
+ if (incrementCount == null) {
+ return;
+ }
+ if (counterNode.getCount() == null) {
+ counterNode.setCount(incrementCount);
+ } else {
+ counterNode.setCount(counterNode.getCount() + incrementCount);
}
+ }
- };
+ private void sort() {
+ counterList.sort(this.descending ? DESC_COMPARATOR : ASC_COMPARATOR);
+ ordered = true;
+ }
public void reset() {
counterList.clear();
diff --git
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 518272e..31d7fb4 100644
---
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -106,10 +106,10 @@ public class TopNMeasureType extends
MeasureType<TopNCounter<ByteArray>> {
}
private void validate(String funcName, DataType dataType, boolean
checkDataType) {
- if (FUNC_TOP_N.equals(funcName) == false)
+ if (!FUNC_TOP_N.equals(funcName))
throw new IllegalArgumentException();
- if (DATATYPE_TOPN.equals(dataType.getName()) == false)
+ if (!DATATYPE_TOPN.equals(dataType.getName()))
throw new IllegalArgumentException();
if (dataType.getPrecision() < 1 || dataType.getPrecision() > 10000)
@@ -186,7 +186,7 @@ public class TopNMeasureType extends
MeasureType<TopNCounter<ByteArray>> {
}
}
- if (needReEncode == false) {
+ if (!needReEncode) {
// no need re-encode
return topNCounter;
}
@@ -312,7 +312,7 @@ public class TopNMeasureType extends
MeasureType<TopNCounter<ByteArray>> {
};
}
- if (digest.aggregations.size() == 0) {
+ if (digest.aggregations.isEmpty()) {
// directly query the UHC column without sorting
boolean b = unmatchedDimensions.removeAll(literalCol);
if (b) {
@@ -377,7 +377,7 @@ public class TopNMeasureType extends
MeasureType<TopNCounter<ByteArray>> {
return false;
}
- if (sum.isSum() == false)
+ if (!sum.isSum())
return false;
if (sum.getParameter() == null || sum.getParameter().getColRefs() ==
null
@@ -411,18 +411,18 @@ public class TopNMeasureType extends
MeasureType<TopNCounter<ByteArray>> {
FunctionDesc topnFunc = measureDesc.getFunction();
List<TblColRef> topnLiteralCol = getTopNLiteralColumn(topnFunc);
- if (sqlDigest.groupbyColumns.containsAll(topnLiteralCol) == false)
{
+ if (!sqlDigest.groupbyColumns.containsAll(topnLiteralCol)) {
continue;
}
- if (sqlDigest.aggregations.size() > 0) {
+ if (!sqlDigest.aggregations.isEmpty()) {
FunctionDesc origFunc =
sqlDigest.aggregations.iterator().next();
- if (origFunc.isSum() == false && origFunc.isCount() == false) {
+ if (!origFunc.isSum() && !origFunc.isCount()) {
logger.warn("When query with topN, only SUM/Count function
is allowed.");
return;
}
- if (isTopNCompatibleSum(measureDesc.getFunction(), origFunc)
== false) {
+ if (!isTopNCompatibleSum(measureDesc.getFunction(), origFunc))
{
continue;
}
@@ -549,7 +549,7 @@ public class TopNMeasureType extends
MeasureType<TopNCounter<ByteArray>> {
}
private TblColRef getTopNNumericColumn(FunctionDesc functionDesc) {
- if (functionDesc.getParameter().isColumnType() == true) {
+ if (functionDesc.getParameter().isColumnType()) {
return functionDesc.getParameter().getColRefs().get(0);
}
return null;
@@ -557,7 +557,7 @@ public class TopNMeasureType extends
MeasureType<TopNCounter<ByteArray>> {
private List<TblColRef> getTopNLiteralColumn(FunctionDesc functionDesc) {
List<TblColRef> allColumns = functionDesc.getParameter().getColRefs();
- if (functionDesc.getParameter().isColumnType() == false) {
+ if (!functionDesc.getParameter().isColumnType()) {
return allColumns;
}
return allColumns.subList(1, allColumns.size());
diff --git
a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
index 506ecf3..48bf678 100644
---
a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
+++
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
@@ -65,7 +65,7 @@ public class TopNCounterBasicTest {
TopNCounter<String> vs = new TopNCounter<String>(3);
String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A",
"C", "A", "A" };
for (String i : stream) {
- vs.offer(i, 10);
+ vs.offer(i, 10d);
}
List<Counter<String>> topK = vs.topK(3);
for (Counter<String> c : topK) {
@@ -78,7 +78,7 @@ public class TopNCounterBasicTest {
TopNCounter<String> vs_increment = new TopNCounter<String>(3);
TopNCounter<String> vs_single = new TopNCounter<String>(3);
String[] stream = { "A", "B", "C", "D", "A" };
- Integer[] increments = { 15, 20, 25, 30, 1 };
+ Double[] increments = { 15d, 20d, 25d, 30d, 1d };
for (int i = 0; i < stream.length; i++) {
vs_increment.offer(stream[i], increments[i]);
diff --git a/kylin-it/src/test/resources/query/sql_topn/query45.sql.disable
b/kylin-it/src/test/resources/query/sql_topn/query45.sql
similarity index 95%
rename from kylin-it/src/test/resources/query/sql_topn/query45.sql.disable
rename to kylin-it/src/test/resources/query/sql_topn/query45.sql
index 39f9571..8940c2e 100644
--- a/kylin-it/src/test/resources/query/sql_topn/query45.sql.disable
+++ b/kylin-it/src/test/resources/query/sql_topn/query45.sql
@@ -20,4 +20,4 @@
select seller_id, sum(price) as s from test_kylin_fact
where lstg_format_name='FP-GTC'
- group by seller_id
+ group by seller_id order by s desc limit 10
diff --git a/kylin-it/src/test/resources/query/sql_topn/query81.sql
b/kylin-it/src/test/resources/query/sql_topn/query81.sql
index 93868e7..1fedef8 100644
--- a/kylin-it/src/test/resources/query/sql_topn/query81.sql
+++ b/kylin-it/src/test/resources/query/sql_topn/query81.sql
@@ -17,7 +17,7 @@
--
select test_cal_dt.week_beg_dt, sum(price) as GMV
- from test_kylin_fact
+ from test_kylin_fact
inner JOIN edw.test_cal_dt as test_cal_dt
ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
inner JOIN test_category_groupings
@@ -25,4 +25,4 @@ inner JOIN edw.test_cal_dt as test_cal_dt
inner JOIN edw.test_sites as test_sites
ON test_kylin_fact.lstg_site_id = test_sites.site_id
where test_cal_dt.week_beg_dt between DATE '2013-09-01' and DATE '2013-10-01'
and (lstg_format_name='FP-GTC' or 'a' = 'b')
- group by test_cal_dt.week_beg_dt
\ No newline at end of file
+ group by test_cal_dt.week_beg_dt, test_kylin_fact.seller_id order by GMV desc
limit 10
\ No newline at end of file
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala
index e799193..ff4f63c 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestTopNUDAF.scala
@@ -27,8 +27,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
class TestTopNUDAF extends SparderBaseFunSuite with SharedSparkSession {
- //ignore temporary
- ignore("basic") {
+ test("basic") {
val schema = StructType(Array(
StructField("rowKey", IntegerType, nullable = true),