So from what I'm understanding, the aggregate itself is not a "stateful operator" but one may follow it? How does the aggregate accumulator keep old values then? It can't all just live in memory, actually, looking at the savepoints it looks like there's state associated with our aggregate operator.
To clarify my concern too, in my retract function impl in the aggregate function class, all I do is remove a value (a group id) from the accumulator set (which is an array). For example, if there is only 1 group_id left for a user and it gets deleted, that group_id will be removed from the accumulator set and the set will be empty. I would hope that at that point, given that there are no remaining rows for the aggregate, that I could or flink will just delete the associated stored accumulator altogether i.e. delete `user_id_1 -> []`. Is it possible that both the groups and the user need to be deleted for everything to clear from storage? That might make more sense actually.. If this doesn't happen, since users delete themselves and their groups all the time, we'll be storing all these empty data sets in rocks for no reason. To clarify, we're using Debezium as our source and using Flink as a materialization engine, so we never want to explicitly set a timeout on any of our data, we just want to scale up predictably with our user growth. Thanks! On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <danny0...@apache.org> wrote: > Hi, Rex Fenley ~ > > If there is stateful operator as the output of the aggregate function. > Then each time the function receives an update (or delete) for the key, the > agg operator would emit 2 messages, one for retracting the old record, one > for the new message. For your case, the new message is the DELETE. > > If there is no stateful operator, the aggregate operator would just emit > the update after (the new) message which is the delete. > > Rex Fenley <r...@remind101.com> 于2020年12月9日周三 上午4:30写道: > >> Hello, >> >> I'd like to better understand delete behavior of AggregateFunctions. >> Let's assume there's an aggregate of `user_id` to a set of `group_ids` for >> groups belonging to that user. >> `user_id_1 -> [group_id_1, group_id_2, etc.]` >> Now let's assume sometime later that deletes arrive for all rows which >> produce user_id_1's group_id's. >> >> Would the aggregate function completely delete the associated state from >> RocksDB or would it leave something like `user_id_1 -> []` sitting in >> RocksDB forever? >> >> We have an aggregate similar to this where users could delete themselves >> and we want to make sure we're not accumulating data forever for those >> users. >> >> Thanks! >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>