Github user kaibozhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135184770 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { accumulator.count -= iWeight; } } + + /** + * CountDistinct accumulator. + */ + public static class CountDistinctAccum { + public MapView<String, Integer> map; + public long count; + } + + /** + * CountDistinct aggregate. + */ + public static class CountDistinct extends AggregateFunction<Long, CountDistinctAccum> { + + @Override + public CountDistinctAccum createAccumulator() { + CountDistinctAccum accum = new CountDistinctAccum(); + accum.map = new MapView<>(Types.STRING, Types.INT); + accum.count = 0L; + return accum; + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, String id) { + try { + if (!accumulator.map.contains(id)) { + accumulator.map.put(id, 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.put(String.valueOf(id), 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Long getValue(CountDistinctAccum accumulator) { + return accumulator.count; + } + } + + /** + * CountDistinct aggregate with merge. + */ + public static class CountDistinctWithMerge extends CountDistinct { + + //Overloaded merge method + public void merge(CountDistinctAccum acc, Iterable<CountDistinctAccum> it) { + Iterator<CountDistinctAccum> iter = it.iterator(); + while (iter.hasNext()) { + CountDistinctAccum mergeAcc = iter.next(); + acc.count += mergeAcc.count; + + try { + Iterator<String> mapItr = mergeAcc.map.keys().iterator(); + while (mapItr.hasNext()) { + String key = mapItr.next(); + if (!acc.map.contains(key)) { + acc.map.put(key, 1); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + /** + * CountDistinct aggregate with merge and reset. + */ + public static class CountDistinctWithMergeAndReset extends CountDistinctWithMerge { + + //Overloaded retract method + public void resetAccumulator(CountDistinctAccum acc) { + acc.map.clear(); + acc.count = 0; + } + } + + /** + * CountDistinct aggregate with retract. + */ + public static class CountDistinctWithRetractAndReset extends CountDistinct { + + //Overloaded retract method + public void retract(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.remove(String.valueOf(id)); --- End diff -- The code here is the opposite. It should be: ``` if (accumulator.map.contains(String.valueOf(id))) { accumulator.count -= 1; accumulator.map.remove(String.valueOf(id)); } ``` One record come, the map will put (record, 1) and count+=1; When do rectract, the count-=1 and map.remove(record, 1);
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---