Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301312 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala --- @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumAccumulator[T]] + a.f0 = numeric.plus(v, a.f0) + a.f1 -= 1 + if (a.f1 < 0) { --- End diff -- This exception usually won't happen if we use the retract for bounded over windows. With the on-going dataStream retraction design, the source table can be a table with PrimaryKey, which can generate retraction message. The downstream streaming job may receive the garbage retraction message, as the logging of the soureTable could contain the out of date retractions. The initial intent to throw the exception here is actually a mark for the future retraction design. Let me add a comment here (or may be just noted down myself) and remove the checks for now.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---