Hi, Does this question make sense or am I missing something?
Thanks! On Thu, Dec 10, 2020 at 10:24 AM Rex Fenley <r...@remind101.com> wrote: > Ok, that makes sense. > > > You just need to correct the acc state to what it expects to be (say > re-evaluate the acc without the record that needs retraction) when you > received the retraction message. > > So for example, if i just remove all items from acc.groupIdSet on > retraction it will know to clear out the state entirely from rocks? > > If a user gets deleted altogether (and my groupby is on user_id) what sort > of retraction do I need to evaluate then? Because I'm thinking now it will > need to just delete the state entirely and pass a full retraction of the > state downstream, but I don't know if deleting state from rocks happens > automatically or I need to make it do that in the retract method somehow. > > On Wed, Dec 9, 2020 at 6:16 PM Danny Chan <danny0...@apache.org> wrote: > >> No, the group agg, stream-stream join and rank are all stateful >> operators which need a state-backend to bookkeep the acc values. >> >> But it is only required to emit the retractions when the stateful >> operator A has a downstream operator B that is also stateful, because the B >> needs the retractions to correct the accs. If B is not stateful, just >> emitting the new record to override is enough. >> >> You just need to correct the acc state to what it expects to be (say >> re-evaluate the acc without the record that needs retraction) when you >> received the retraction message. >> >> Rex Fenley <r...@remind101.com> 于2020年12月10日周四 上午2:44写道: >> >>> So from what I'm understanding, the aggregate itself is not a "stateful >>> operator" but one may follow it? How does the aggregate accumulator keep >>> old values then? It can't all just live in memory, actually, looking at the >>> savepoints it looks like there's state associated with our aggregate >>> operator. >>> >>> To clarify my concern too, in my retract function impl in the aggregate >>> function class, all I do is remove a value (a group id) from the >>> accumulator set (which is an array). For example, if there is only 1 >>> group_id left for a user and it gets deleted, that group_id will be removed >>> from the accumulator set and the set will be empty. I would hope that at >>> that point, given that there are no remaining rows for the aggregate, that >>> I could or flink will just delete the associated stored accumulator >>> altogether i.e. delete `user_id_1 -> []`. Is it possible that both the >>> groups and the user need to be deleted for everything to clear from >>> storage? That might make more sense actually.. >>> >>> If this doesn't happen, since users delete themselves and their groups >>> all the time, we'll be storing all these empty data sets in rocks for no >>> reason. To clarify, we're using Debezium as our source and using Flink as a >>> materialization engine, so we never want to explicitly set a timeout on any >>> of our data, we just want to scale up predictably with our user growth. >>> >>> Thanks! >>> >>> On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <danny0...@apache.org> wrote: >>> >>>> Hi, Rex Fenley ~ >>>> >>>> If there is stateful operator as the output of the aggregate function. >>>> Then each time the function receives an update (or delete) for the key, the >>>> agg operator would emit 2 messages, one for retracting the old record, one >>>> for the new message. For your case, the new message is the DELETE. >>>> >>>> If there is no stateful operator, the aggregate operator would just >>>> emit the update after (the new) message which is the delete. >>>> >>>> Rex Fenley <r...@remind101.com> 于2020年12月9日周三 上午4:30写道: >>>> >>>>> Hello, >>>>> >>>>> I'd like to better understand delete behavior of AggregateFunctions. >>>>> Let's assume there's an aggregate of `user_id` to a set of `group_ids` for >>>>> groups belonging to that user. >>>>> `user_id_1 -> [group_id_1, group_id_2, etc.]` >>>>> Now let's assume sometime later that deletes arrive for all rows which >>>>> produce user_id_1's group_id's. >>>>> >>>>> Would the aggregate function completely delete the associated state >>>>> from RocksDB or would it leave something like `user_id_1 -> []` sitting in >>>>> RocksDB forever? >>>>> >>>>> We have an aggregate similar to this where users could delete >>>>> themselves and we want to make sure we're not accumulating data forever >>>>> for >>>>> those users. >>>>> >>>>> Thanks! >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>