Hi Adam, Thanks for reporting this issue. The bug you identified has been fixed in the release-1.18 and master branches, which will be released in v1.18.2 and v1.19.0. You can now use the `emitUpdateWithRetract` method with the expected behavior. More details can be found here[1].
[1] https://issues.apache.org/jira/browse/FLINK-31788 Best, Jane On Wed, Apr 12, 2023 at 11:17 PM Feng Jin <jinfeng1...@gmail.com> wrote: > hi Adam > > As far as I know, there is currently no similar API available, > but I believe that this feature was accidentally removed and we should add > it back. > I have created a Jira to track the progress of this feature. > https://issues.apache.org/jira/browse/FLINK-31788 > > > > On Tue, Apr 11, 2023 at 12:10 AM Adam Augusta <rox...@gmail.com> wrote: > >> Many thanks for the sanity check, Feng. >> >> It’s a shame this well-documented feature was silently removed. >> emitValue() creates an unreasonable amount of unnecessary and disruptive >> chatter on the changelog stream, as evidenced by putting a print table >> after the flatAggregate. Lots of -D/+I RowData pairs with identical fields. >> >> Is there any clean way to set up a stateful group aggregation in the 1.18 >> Table API that doesn’t misbehave in this fashion? >> >> On Mon, Apr 10, 2023 at 11:43 AM Feng Jin <jinfeng1...@gmail.com> wrote: >> >>> hi Adam >>> >>> I have checked the code and indeed this feature is not available in the >>> latest version of Flink code. >>> >>> This feature was originally implemented in the old planner: >>> <https://github.com/apache/flink/pull/8550/files> >>> https://github.com/apache/flink/pull/8550/files >>> >>> However, this logic was not implemented in the new planner , the Blink >>> planner. >>> >>> With the removal of the old planner in version 1.14 >>> https://github.com/apache/flink/pull/16080 , this code was also removed. >>> >>> >>> >>> Best >>> >>> Feng >>> >>> On Sat, Apr 8, 2023 at 4:17 AM Adam Augusta <rox...@gmail.com> wrote: >>> >>>> The TableAggregateFunction javadocs indicate that either "emitValue" or >>>> "emitUpdateWithRetract" is required. >>>> >>>> But if I implement my TableAggregateFunction with >>>> "emitUpdateWithRetract", I get a validation error. If I implement both >>>> methods it works, but emitUpdateWithRetract is not used. >>>> >>>> Peering into the Flink source code, I see that >>>> ImperativeAggCodeGen validates the presence of emitValue, but is agnostic >>>> to emitUpdateWithRetract. >>>> More curiously, Flink's source code doesn't have a single test with a >>>> TableAggregateFunction that uses emitUpdateWithRetract. >>>> >>>> Is this a ghost feature? >>>> >>>> Thanks, >>>> Adam >>>> >>>