Hi,

Does this question make sense or am I missing something?

Thanks!

On Thu, Dec 10, 2020 at 10:24 AM Rex Fenley <r...@remind101.com> wrote:

> Ok, that makes sense.
>
> > You just need to correct the acc state to what it expects to be (say
> re-evaluate the acc without the record that needs retraction) when you
> received  the retraction message.
>
> So for example, if i just remove all items from acc.groupIdSet on
> retraction it will know to clear out the state entirely from rocks?
>
> If a user gets deleted altogether (and my groupby is on user_id) what sort
> of retraction do I need to evaluate then? Because I'm thinking now it will
> need to just delete the state entirely and pass a full retraction of the
> state downstream, but I don't know if deleting state from rocks happens
> automatically or I need to make it do that in the retract method somehow.
>
> On Wed, Dec 9, 2020 at 6:16 PM Danny Chan <danny0...@apache.org> wrote:
>
>> No, the group agg, stream-stream join and rank are all stateful
>> operators which need a state-backend to bookkeep the acc values.
>>
>> But it is only required to emit the retractions when the stateful
>> operator A has a downstream operator B that is also stateful, because the B
>> needs the retractions to correct the accs. If B is not stateful, just
>> emitting the new record to override is enough.
>>
>> You just need to correct the acc state to what it expects to be (say
>> re-evaluate the acc without the record that needs retraction) when you
>> received  the retraction message.
>>
>> Rex Fenley <r...@remind101.com> 于2020年12月10日周四 上午2:44写道:
>>
>>> So from what I'm understanding, the aggregate itself is not a "stateful
>>> operator" but one may follow it? How does the aggregate accumulator keep
>>> old values then? It can't all just live in memory, actually, looking at the
>>> savepoints it looks like there's state associated with our aggregate
>>> operator.
>>>
>>> To clarify my concern too, in my retract function impl in the aggregate
>>> function class, all I do is remove a value (a group id) from the
>>> accumulator set (which is an array). For example, if there is only 1
>>> group_id left for a user and it gets deleted, that group_id will be removed
>>> from the accumulator set and the set will be empty. I would hope that at
>>> that point, given that there are no remaining rows for the aggregate, that
>>> I could or flink will just delete the associated stored accumulator
>>> altogether i.e. delete `user_id_1 -> []`. Is it possible that both the
>>> groups and the user need to be deleted for everything to clear from
>>> storage? That might make more sense actually..
>>>
>>> If this doesn't happen, since users delete themselves and their groups
>>> all the time, we'll be storing all these empty data sets in rocks for no
>>> reason. To clarify, we're using Debezium as our source and using Flink as a
>>> materialization engine, so we never want to explicitly set a timeout on any
>>> of our data, we just want to scale up predictably with our user growth.
>>>
>>> Thanks!
>>>
>>> On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <danny0...@apache.org> wrote:
>>>
>>>> Hi, Rex Fenley ~
>>>>
>>>> If there is stateful operator as the output of the aggregate function.
>>>> Then each time the function receives an update (or delete) for the key, the
>>>> agg operator would emit 2 messages, one for retracting the old record, one
>>>> for the new message. For your case, the new message is the DELETE.
>>>>
>>>> If there is no stateful operator, the aggregate operator would just
>>>> emit the update after (the new) message which is the delete.
>>>>
>>>> Rex Fenley <r...@remind101.com> 于2020年12月9日周三 上午4:30写道:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'd like to better understand delete behavior of AggregateFunctions.
>>>>> Let's assume there's an aggregate of `user_id` to a set of `group_ids` for
>>>>> groups belonging to that user.
>>>>> `user_id_1 -> [group_id_1, group_id_2, etc.]`
>>>>> Now let's assume sometime later that deletes arrive for all rows which
>>>>> produce user_id_1's group_id's.
>>>>>
>>>>> Would the aggregate function completely delete the associated state
>>>>> from RocksDB or would it leave something like `user_id_1 -> []` sitting in
>>>>> RocksDB forever?
>>>>>
>>>>> We have an aggregate similar to this where users could delete
>>>>> themselves and we want to make sure we're not accumulating data forever 
>>>>> for
>>>>> those users.
>>>>>
>>>>> Thanks!
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to