Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135128744 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { accumulator.count -= iWeight; } } + + /** + * CountDistinct accumulator. + */ + public static class CountDistinctAccum { + public MapView<String, Integer> map; + public long count; + } + + /** + * CountDistinct aggregate. + */ + public static class CountDistinct extends AggregateFunction<Long, CountDistinctAccum> { + + @Override + public CountDistinctAccum createAccumulator() { + CountDistinctAccum accum = new CountDistinctAccum(); + accum.map = new MapView<>(Types.STRING, Types.INT); + accum.count = 0L; + return accum; + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, String id) { + try { + if (!accumulator.map.contains(id)) { + accumulator.map.put(id, 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Overloaded accumulate method + public void accumulate(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.put(String.valueOf(id), 1); + accumulator.count += 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public Long getValue(CountDistinctAccum accumulator) { + return accumulator.count; + } + } + + /** + * CountDistinct aggregate with merge. + */ + public static class CountDistinctWithMerge extends CountDistinct { + + //Overloaded merge method + public void merge(CountDistinctAccum acc, Iterable<CountDistinctAccum> it) { + Iterator<CountDistinctAccum> iter = it.iterator(); + while (iter.hasNext()) { + CountDistinctAccum mergeAcc = iter.next(); + acc.count += mergeAcc.count; + + try { + Iterator<String> mapItr = mergeAcc.map.keys().iterator(); + while (mapItr.hasNext()) { + String key = mapItr.next(); + if (!acc.map.contains(key)) { + acc.map.put(key, 1); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + /** + * CountDistinct aggregate with merge and reset. + */ + public static class CountDistinctWithMergeAndReset extends CountDistinctWithMerge { + + //Overloaded retract method + public void resetAccumulator(CountDistinctAccum acc) { + acc.map.clear(); + acc.count = 0; + } + } + + /** + * CountDistinct aggregate with retract. + */ + public static class CountDistinctWithRetractAndReset extends CountDistinct { + + //Overloaded retract method + public void retract(CountDistinctAccum accumulator, long id) { + try { + if (!accumulator.map.contains(String.valueOf(id))) { + accumulator.map.remove(String.valueOf(id)); + accumulator.count -= 1; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Overloaded retract method + public void resetAccumulator(CountDistinctAccum acc) { + acc.map.clear(); + acc.count = 0; + } + } + + /** + * Accumulator for test DataView. + */ + public static class DataViewTestAccum { + public MapView<String, Integer> map; + public MapView<String, Integer> map2; + public long count; + private ListView<Long> list = new ListView<>(Types.LONG); + + public ListView<Long> getList() { + return list; + } + + public void setList(ListView<Long> list) { + this.list = list; + } + } + + /** + * Aggregate for test DataView. + */ + public static class DataViewTestAgg extends AggregateFunction<Long, DataViewTestAccum> { + + @Override + public DataViewTestAccum createAccumulator() { + DataViewTestAccum accum = new DataViewTestAccum(); + accum.map = new MapView<>(Types.STRING, Types.INT); --- End diff -- was `map2` not initialized on purpose?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---