No, the group agg, stream-stream join and rank are all stateful operators which need a state-backend to bookkeep the acc values.
But it is only required to emit the retractions when the stateful operator A has a downstream operator B that is also stateful, because the B needs the retractions to correct the accs. If B is not stateful, just emitting the new record to override is enough. You just need to correct the acc state to what it expects to be (say re-evaluate the acc without the record that needs retraction) when you received the retraction message. Rex Fenley <r...@remind101.com> 于2020年12月10日周四 上午2:44写道: > So from what I'm understanding, the aggregate itself is not a "stateful > operator" but one may follow it? How does the aggregate accumulator keep > old values then? It can't all just live in memory, actually, looking at the > savepoints it looks like there's state associated with our aggregate > operator. > > To clarify my concern too, in my retract function impl in the aggregate > function class, all I do is remove a value (a group id) from the > accumulator set (which is an array). For example, if there is only 1 > group_id left for a user and it gets deleted, that group_id will be removed > from the accumulator set and the set will be empty. I would hope that at > that point, given that there are no remaining rows for the aggregate, that > I could or flink will just delete the associated stored accumulator > altogether i.e. delete `user_id_1 -> []`. Is it possible that both the > groups and the user need to be deleted for everything to clear from > storage? That might make more sense actually.. > > If this doesn't happen, since users delete themselves and their groups all > the time, we'll be storing all these empty data sets in rocks for no > reason. To clarify, we're using Debezium as our source and using Flink as a > materialization engine, so we never want to explicitly set a timeout on any > of our data, we just want to scale up predictably with our user growth. > > Thanks! > > On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <danny0...@apache.org> wrote: > >> Hi, Rex Fenley ~ >> >> If there is stateful operator as the output of the aggregate function. >> Then each time the function receives an update (or delete) for the key, the >> agg operator would emit 2 messages, one for retracting the old record, one >> for the new message. For your case, the new message is the DELETE. >> >> If there is no stateful operator, the aggregate operator would just emit >> the update after (the new) message which is the delete. >> >> Rex Fenley <r...@remind101.com> 于2020年12月9日周三 上午4:30写道: >> >>> Hello, >>> >>> I'd like to better understand delete behavior of AggregateFunctions. >>> Let's assume there's an aggregate of `user_id` to a set of `group_ids` for >>> groups belonging to that user. >>> `user_id_1 -> [group_id_1, group_id_2, etc.]` >>> Now let's assume sometime later that deletes arrive for all rows which >>> produce user_id_1's group_id's. >>> >>> Would the aggregate function completely delete the associated state from >>> RocksDB or would it leave something like `user_id_1 -> []` sitting in >>> RocksDB forever? >>> >>> We have an aggregate similar to this where users could delete themselves >>> and we want to make sure we're not accumulating data forever for those >>> users. >>> >>> Thanks! >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> >