Hi Chao, 1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me quote the javadoc of the CoProcessFunction:
"Contrary to the {@link CoFlatMapFunction}, this function can also query the time (both event and processing) and set timers, through the provided {@link Context}. When reacting to the firing of set timers the function can emit yet more elements." So, imho, both deliver a different level of abstraction and control (high- vs. low-level). Also note the different methods available for you to implement. 2a) In general, Flink calls functions on a per-record base in a serialized fashion per task. For each task at a TaskManager, in case of it having multiple slots, separate function objects are used where you should only get in trouble if you share static references. Otherwise you do not need to worry about thread-safety. 2b) From what I see in the code (StreamTwoInputProcessor), the same should apply to CoFlatMapFunction and CoProcessFunction so that calls to flatMap1/2 and processElement1/2 are not called in parallel! 3) why would you want to store the CoProcessFunction.Context? Nico On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote: > Hi, > > I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe, > and to what extent? What's the difference between the two Functions? and > in general, how does Flink prevent race conditions? Here's my case: > > I tried to condition on two input streams and produce the third stream > if the condition is met. I implemented CoFlatMapFunction and tried to > monitor a state using a field in the implemented class (I want to > isolate my application from the checkpointing feature, and therefore I > do not use the states as documented here > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state > .html). The field served as a flag indicating whether there are some pending > data from either input stream, and if yes, processing it along with the > arriving data from the other input stream (the processing invokes a native > function). > > But then I got double free error and segmentation fault, which I believe > was due to unintentional concurrent access to the native function. Then > I tried to wrap the access into a synchronized method, as well as > explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain > and the error remained. > > I considered using CoProcessFunction in my case, but seems to me that it > does not handle customary internal states, stating in the javadoc "The > context [CoProcessFunction.Context] is only valid during the invocation > of this method, do not store it." > > > > Thanks, > Chao
signature.asc
Description: This is a digitally signed message part.