Hi Jérémy, objects are serialized when you store them in state. So when you retrieve run_state from state, it is deserialized and you have a fresh instance. Calling method_caller() then modifies this instance, but *not *the serialized version stored in state. In the second attempt you described, you modified the retrieved instance, then stored the modified version in state (or rather a serialized form of it) - which works as expected.
I don't think there is any way around explicitly storing any local changes you made in state. Best, Nico On Mon, Sep 20, 2021 at 5:22 PM Jérémy Albrecht <jalbre...@skapane.ai> wrote: > Hi all ! > > I am currently developing an environment that uses Flink Stateful > functions in Python. The architecture is complex but here are the main > points that needs to be understood to frame the problem I am facing. > * functions.py contains several methods, one of them is handling protobuf > messages and storing into context.storage a ValueSpec which is a python > Object. This object contains classical types such as natives types, but > also lists of named tuple and objects. > Since pickle is not able to serialize objects and functions, I use dill to > serialize this object and make a Flink-compatible type. > * The object stored in the Context can be resumed as this: > @dataclass > class RunState: > duration: int > runner: Runner > and runner is also an instance of the class Runner, defined as: > class Runner: > counter: int = 0 > > def prepare_smth(self): > self.counter = 10 > > def add_to_counter(): > self.counter += 1 > > return add_to_counter > > From the object stored in the state of the Flink function I so have access > to the function definition *add_to_counter* and I can do something like: > method_caller = ctx.storage.run_state.runner.prepare_smth() > method_caller() > > -> from now on I expect the attribute counter from the Runner instance > defined as runner in the Run State to have the value 11. What happens is > that the value is modified inside the context of add_to_counter and > prepare_smth, but the change is never reflected inside the object stored in > the context. ctx.storage.state.runner.counter still equals to 0. > > * After some research with the debugger, in my opinion, the program tries > to update the value but ctx.storage seems to be only updatable when I do a > reassignement: > e.g. ctx.storage.run_state.counter += 1 has NO effect, but > rs = ctx.storage.run_state > rs.counter +=1 > ctx.storage.run_state = rs has the expected result. > > If you have any clue about how to start or what to do about my problem I > greatly appreciate the help !! > Don't hesitate to ask me if anything is unclear :) > > Thanks, > Jérémy > >