HuangXingBo commented on a change in pull request #13854: URL: https://github.com/apache/flink/pull/13854#discussion_r515916741
########## File path: flink-python/pyflink/table/functions.py ########## @@ -179,3 +740,69 @@ class DecimalSum0AggFunction(Sum0AggFunction): def create_accumulator(self): # [sum] return [Decimal('0')] + + +class SumAggFunction(AggregateFunction): + + def get_value(self, accumulator): + return accumulator[0] + + def create_accumulator(self): + # [sum] + return [None] + + def accumulate(self, accumulator, *args): + if args[0] is not None: + if accumulator[0] is None: + accumulator[0] = args[0] + else: + accumulator[0] += args[0] + + def retract(self, accumulator, *args): + raise NotImplementedError("This function does not support retraction.") + + def merge(self, accumulator, accumulators): + for acc in accumulators: + if acc[0] is not None: + if accumulator[0] is None: + accumulator[0] = acc[0] + else: + accumulator[0] += acc[0] + + +class SumWithRetractAggFunction(AggregateFunction): + + def get_value(self, accumulator): + if accumulator[1] == 0: + return None + else: + return accumulator[0] + + def create_accumulator(self): + # [sum, count] + return [None, 0] Review comment: I think we can optimize the initial value of `sum` to `0`, so that we can remove the logic of deciding whether `accumulator[0]` is None. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org