Hi all !

I am currently developing an environment that uses Flink Stateful functions in 
Python. The architecture is complex but here are the main points that needs to 
be understood to frame the problem I am facing.
* functions.py contains several methods, one of them is handling protobuf 
messages and storing into context.storage a ValueSpec which is a python Object. 
This object contains classical types such as natives types, but also lists of 
named tuple and objects.
Since pickle is not able to serialize objects and functions, I use dill to 
serialize this object and make a Flink-compatible type. 
* The object stored in the Context can be resumed as this:
@dataclass
class RunState:
    duration: int
    runner: Runner
and runner is also an instance of the class Runner, defined as:
class Runner:
    counter: int = 0
    
    def prepare_smth(self):
        self.counter = 10
        
        def add_to_counter():
            self.counter += 1
       
       return add_to_counter

>From the object stored in the state of the Flink function I so have access to 
>the function definition *add_to_counter* and I can do something like:
method_caller = ctx.storage.run_state.runner.prepare_smth()
method_caller()

-> from now on I expect the attribute counter from the Runner instance defined 
as runner in the Run State to have the value 11. What happens is that the value 
is modified inside the context of add_to_counter and prepare_smth, but the 
change is never reflected inside the object stored in the context. 
ctx.storage.state.runner.counter still equals to 0. 

* After some research with the debugger, in my opinion, the program tries to 
update the value but ctx.storage seems to be only updatable when I do a 
reassignement: 
e.g. ctx.storage.run_state.counter += 1 has NO effect, but 
rs = ctx.storage.run_state
rs.counter +=1
ctx.storage.run_state = rs has the expected result.

If you have any clue about how to start or what to do about my problem I 
greatly appreciate the help !!
Don't hesitate to ask me if anything is unclear :)

Thanks,
Jérémy
    

Reply via email to