Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5555#discussion_r184441869 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -47,60 +51,51 @@ class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, JLon override def equals(that: Any): Boolean = that match { case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && - this.mapView == that.mapView + this.distinctValueMap == that.distinctValueMap case _ => false } def add(element: E): Boolean = { - if (element != null) { - val currentVal = mapView.get(element) - if (currentVal != null) { - mapView.put(element, currentVal + 1L) - false - } else { - mapView.put(element, 1L) - true - } - } else { + val wrappedElement = Row.of(element) --- End diff -- Thanks @fhueske for the insight. yeah I thought about that before the last commit but didn't go through with it, since we still need to construct the row of single element before passing to the distinct accumulator. But you are right, it will make future optimization easier
---