Hi Jérémy,

objects are serialized when you store them in state. So when you retrieve
run_state from state, it is deserialized and you have a fresh instance.
Calling method_caller() then modifies this instance, but *not *the
serialized version stored in state.
In the second attempt you described, you modified the retrieved instance,
then stored the modified version in state (or rather a serialized form of
it) - which works as expected.

I don't think there is any way around explicitly storing any local changes
you made in state.

Best,
Nico

On Mon, Sep 20, 2021 at 5:22 PM Jérémy Albrecht <jalbre...@skapane.ai>
wrote:

> Hi all !
>
> I am currently developing an environment that uses Flink Stateful
> functions in Python. The architecture is complex but here are the main
> points that needs to be understood to frame the problem I am facing.
> * functions.py contains several methods, one of them is handling protobuf
> messages and storing into context.storage a ValueSpec which is a python
> Object. This object contains classical types such as natives types, but
> also lists of named tuple and objects.
> Since pickle is not able to serialize objects and functions, I use dill to
> serialize this object and make a Flink-compatible type.
> * The object stored in the Context can be resumed as this:
> @dataclass
> class RunState:
>     duration: int
>     runner: Runner
> and runner is also an instance of the class Runner, defined as:
> class Runner:
>     counter: int = 0
>
>     def prepare_smth(self):
>         self.counter = 10
>
>         def add_to_counter():
>             self.counter += 1
>
>        return add_to_counter
>
> From the object stored in the state of the Flink function I so have access
> to the function definition *add_to_counter* and I can do something like:
> method_caller = ctx.storage.run_state.runner.prepare_smth()
> method_caller()
>
> -> from now on I expect the attribute counter from the Runner instance
> defined as runner in the Run State to have the value 11. What happens is
> that the value is modified inside the context of add_to_counter and
> prepare_smth, but the change is never reflected inside the object stored in
> the context. ctx.storage.state.runner.counter still equals to 0.
>
> * After some research with the debugger, in my opinion, the program tries
> to update the value but ctx.storage seems to be only updatable when I do a
> reassignement:
> e.g. ctx.storage.run_state.counter += 1 has NO effect, but
> rs = ctx.storage.run_state
> rs.counter +=1
> ctx.storage.run_state = rs has the expected result.
>
> If you have any clue about how to start or what to do about my problem I
> greatly appreciate the help !!
> Don't hesitate to ask me if anything is unclear :)
>
> Thanks,
> Jérémy
>
>

Reply via email to