So from what I'm understanding, the aggregate itself is not a "stateful
operator" but one may follow it? How does the aggregate accumulator keep
old values then? It can't all just live in memory, actually, looking at the
savepoints it looks like there's state associated with our aggregate
operator.

To clarify my concern too, in my retract function impl in the aggregate
function class, all I do is remove a value (a group id) from the
accumulator set (which is an array). For example, if there is only 1
group_id left for a user and it gets deleted, that group_id will be removed
from the accumulator set and the set will be empty. I would hope that at
that point, given that there are no remaining rows for the aggregate, that
I could or flink will just delete the associated stored accumulator
altogether i.e. delete `user_id_1 -> []`. Is it possible that both the
groups and the user need to be deleted for everything to clear from
storage? That might make more sense actually..

If this doesn't happen, since users delete themselves and their groups all
the time, we'll be storing all these empty data sets in rocks for no
reason. To clarify, we're using Debezium as our source and using Flink as a
materialization engine, so we never want to explicitly set a timeout on any
of our data, we just want to scale up predictably with our user growth.

Thanks!

On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <danny0...@apache.org> wrote:

> Hi, Rex Fenley ~
>
> If there is stateful operator as the output of the aggregate function.
> Then each time the function receives an update (or delete) for the key, the
> agg operator would emit 2 messages, one for retracting the old record, one
> for the new message. For your case, the new message is the DELETE.
>
> If there is no stateful operator, the aggregate operator would just emit
> the update after (the new) message which is the delete.
>
> Rex Fenley <r...@remind101.com> 于2020年12月9日周三 上午4:30写道:
>
>> Hello,
>>
>> I'd like to better understand delete behavior of AggregateFunctions.
>> Let's assume there's an aggregate of `user_id` to a set of `group_ids` for
>> groups belonging to that user.
>> `user_id_1 -> [group_id_1, group_id_2, etc.]`
>> Now let's assume sometime later that deletes arrive for all rows which
>> produce user_id_1's group_id's.
>>
>> Would the aggregate function completely delete the associated state from
>> RocksDB or would it leave something like `user_id_1 -> []` sitting in
>> RocksDB forever?
>>
>> We have an aggregate similar to this where users could delete themselves
>> and we want to make sure we're not accumulating data forever for those
>> users.
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to