Hi all,
if you don't want to read the wall of text below, in short, I want to 
know if it is possible to get a superstep-based iteration on a possibly 
unbounded DataStream in Flink in an efficient way and what general 
concept(s) of synchronization you would suggest for that.

I would like to write a program that has different vertices (realized just as Longs for now) in a graph which all store a keyed state and communicate with each other with messages that arrive in an iterated stream.
From the outside I would only get the messages to add (or, possibly in 
the future, delete, however that can be ignored for now) a certain 
vertex with some possible additional information specified by the 
program (this message can be assumed to have the same form as any other 
message) and then the rest would happen through an iterated stream keyed 
by the vertex to which the message is adressed in which a vertex through 
a KeyedProcessFunction (or KeyedBroadcastProcessFunction if a 
BroadcastStream is used for synchronization) can send new messages to 
any other vertex (or itself) based on the received message(s) and its 
own current state and can also update its state based on the received 
message(s). The new messages would then be fed back into the iterated 
stream. If no synchronization is done this works quite well, however it 
doesn't produce helpful results for my problem since no order in which 
the messages arrive can be guaranteed.
What I would optimally like to have is a pregel-like superstep-based 
iteration which runs on a batch of outside messages (here: vertex 
additions) until no more messages are produced and after that repeats 
that with the next batch of vertices either infinitely or until there 
are no more new messages received. During the execution of each batch 
all vertices (including older ones) can be activated again by receiving 
a message and the state of each vertex should be preserved throughout 
the execution of the program. The problem lies in how I can seperate the 
messages into supersteps in an iterative partitioned stream similar to 
the iterations in the DataSetAPI.
One idea I had was just making tumbling windows of a large enough amount 
of time which would just collect all the messages and then emit them in 
a ProcessWindowFunction once the window fires. While this would be quite 
a simple solution that requires little non-parallel synchonization and 
it would obviously require that we know such a time in which we can be 
guaranteed that all messages have been processed and all new messages 
for the next superstep produced which is realistically not the case. It 
would also mean that in most supersteps the program would wait longer 
than necessary until it starts the next superstep. Fault tolerance would 
also be very hard to achieve.
Another more complex idea was to just globally synchronize with an 
object that remembers which vertices have been sent messages in the 
previous superstep by being informed before any message is sent and then 
is also informed when a vertex is done with processing a message and 
informs the vertex if there globally are no more messages to be 
processed. If that is the case the vertex then sends a NextSuperstep 
message which is broadcast to all partitions with a BroadcastStream. 
After that all vertices can start with processing all messages sent to 
them in the previous superstep. Other than not being trivially to 
synchronize without any problems (which I'm working on myself) this 
approach has the obvious disadvantage that a lot of information has to 
be passed to this object in a globally synchronized manner which kind of 
kills the point of parallel processing. Although it is obvious that some 
global synchronization probably has to take place this approach seems 
rather ineffective to me.
Since I haven't been working with flink for very long, although I have 
intensively used it for the past couple of weeks and read all releveant 
documentation I could find, I would like to ask if someone has a 
suggestion how to implement such a superstep-based iteration in the 
DataStreamAPI in the most efficient way with Flink and if you think this 
is actually even a worthwhile endeavor. I would especially like to know 
if Flink already provides classes, methods or concepts that would be 
helpful for that.
Since our project isn't really close to a finished program yet and 
consists mainly of various test programs, I cannot really show you a 
complete code of what I already have, but if anyone has any specific 
questions I probably can send you a pseudocode or a java code of one of 
these test programs to describe what I imagine. Also, since we are still 
relatively open on how exactly we want to solve our original problem, 
I'm also open to suggestions which solve only  a similar problem, even 
if they don't fully fit what I described above.
It's of course also possible that there is already a simple solution in 
Flink which I somehow manged to overlook until now. In that case I'm 
sorry for bothering you but I would still like to know what I should 
look up.

Best, Christian



Reply via email to