Hi Adam,

Thanks for reporting this issue. The bug you identified has been fixed in
the release-1.18 and master branches, which will be released in v1.18.2 and
v1.19.0. You can now use the `emitUpdateWithRetract` method with the
expected behavior. More details can be found here[1].


[1] https://issues.apache.org/jira/browse/FLINK-31788

Best,
Jane

On Wed, Apr 12, 2023 at 11:17 PM Feng Jin <jinfeng1...@gmail.com> wrote:

> hi Adam
>
> As far as I know, there is currently no similar API available,
> but I believe that this feature was accidentally removed and we should add
> it back.
> I have created a Jira to track the progress of this feature.
> https://issues.apache.org/jira/browse/FLINK-31788
>
>
>
> On Tue, Apr 11, 2023 at 12:10 AM Adam Augusta <rox...@gmail.com> wrote:
>
>> Many thanks for the sanity check, Feng.
>>
>> It’s a shame this well-documented feature was silently removed.
>> emitValue() creates an unreasonable amount of unnecessary and disruptive
>> chatter on the changelog stream, as evidenced by putting a print table
>> after the flatAggregate. Lots of -D/+I RowData pairs with identical fields.
>>
>> Is there any clean way to set up a stateful group aggregation in the 1.18
>> Table API that doesn’t misbehave in this fashion?
>>
>> On Mon, Apr 10, 2023 at 11:43 AM Feng Jin <jinfeng1...@gmail.com> wrote:
>>
>>> hi Adam
>>>
>>> I have checked the code and indeed this feature is not available in the
>>> latest version of Flink code.
>>>
>>> This feature was originally implemented in the old planner:
>>> <https://github.com/apache/flink/pull/8550/files>
>>> https://github.com/apache/flink/pull/8550/files
>>>
>>> However, this logic was not implemented in the new planner , the Blink
>>> planner.
>>>
>>> With the removal of the old planner in version 1.14
>>> https://github.com/apache/flink/pull/16080 , this code was also removed.
>>>
>>>
>>>
>>> Best
>>>
>>> Feng
>>>
>>> On Sat, Apr 8, 2023 at 4:17 AM Adam Augusta <rox...@gmail.com> wrote:
>>>
>>>> The TableAggregateFunction javadocs indicate that either "emitValue" or
>>>> "emitUpdateWithRetract" is required.
>>>>
>>>> But if I implement my TableAggregateFunction with
>>>> "emitUpdateWithRetract", I get a validation error. If I implement both
>>>> methods it works, but emitUpdateWithRetract is not used.
>>>>
>>>> Peering into the Flink source code, I see that
>>>> ImperativeAggCodeGen validates the presence of emitValue, but is agnostic
>>>> to emitUpdateWithRetract.
>>>> More curiously, Flink's source code doesn't have a single test with a
>>>> TableAggregateFunction that uses emitUpdateWithRetract.
>>>>
>>>> Is this a ghost feature?
>>>>
>>>> Thanks,
>>>> Adam
>>>>
>>>

Reply via email to