Github user shaoxuan-wang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3470#discussion_r104301312
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala
 ---
    @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends 
AggregateFunction[T] {
           val v = value.asInstanceOf[T]
           val a = accumulator.asInstanceOf[SumAccumulator[T]]
           a.f0 = numeric.plus(v, a.f0)
    -      a.f1 = true
    +      a.f1 += 1
    +    }
    +  }
    +
    +  override def retract(accumulator: Accumulator, value: Any): Unit = {
    +    if (value != null) {
    +      val v = value.asInstanceOf[T]
    +      val a = accumulator.asInstanceOf[SumAccumulator[T]]
    +      a.f0 = numeric.plus(v, a.f0)
    +      a.f1 -= 1
    +      if (a.f1 < 0) {
    --- End diff --
    
    This exception usually won't happen if we use the retract for bounded over 
windows. With the on-going dataStream retraction design, the source table can 
be a table with PrimaryKey, which can generate retraction message. The 
downstream streaming job may receive the garbage retraction message, as the 
logging of the soureTable could contain the out of date retractions. The 
initial intent to throw the exception here is actually a mark for the future 
retraction design. Let me add a comment here (or may be just noted down myself) 
and remove the checks for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to