Hi Chao, what I meant by "per-record base" was actually supposed to be "per-event base" (event = one entity of whatever the stream contains). As from the API: processing is supposed to be one event at a time and this is what is performed internally, too.
Nico On Thursday, 17 August 2017 05:06:07 CEST Chao Wang wrote: > Thank you! Nico. That helps me a lot! > > 2a) That really clarifies my understanding about Flink. Yes, I think I > have used static references, since I invoked a native function > (implemented through JNI) which I believe only has one instance per > process. And I guess the reason why those Java synchronization > mechanisms were in vain is because of separate function objects at > runtime, which results in separate lock objects. Now I use c++ mutex > within the native function and it resolves my case. > > BTW, could you elaborate a bit more about what do you mean by > "per-record base"? what do you mean by a record? > > 3) I do not intend to store the CoProcessFunction.Context. I was just > wondering that since the document said it is only valid during the > invocation, for maintaining custom states of my program logic I guess I > cannot use it. > > > Thank you, > Chao > > On 08/16/2017 03:31 AM, Nico Kruber wrote: > > Hi Chao, > > > > 1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me > > quote the javadoc of the CoProcessFunction: > > > > "Contrary to the {@link CoFlatMapFunction}, this function can also query > > the time (both event and processing) and set timers, through the provided > > {@link Context}. When reacting to the firing of set timers the function > > can emit yet more elements." > > > > So, imho, both deliver a different level of abstraction and control (high- > > vs. low-level). Also note the different methods available for you to > > implement. > > > > 2a) In general, Flink calls functions on a per-record base in a serialized > > fashion per task. For each task at a TaskManager, in case of it having > > multiple slots, separate function objects are used where you should only > > get in trouble if you share static references. Otherwise you do not need > > to worry about thread-safety. > > > > 2b) From what I see in the code (StreamTwoInputProcessor), the same should > > apply to CoFlatMapFunction and CoProcessFunction so that calls to > > flatMap1/2 and processElement1/2 are not called in parallel! > > > > 3) why would you want to store the CoProcessFunction.Context? > > > > > > Nico > > > > On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote: > >> Hi, > >> > >> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe, > >> and to what extent? What's the difference between the two Functions? and > >> in general, how does Flink prevent race conditions? Here's my case: > >> > >> I tried to condition on two input streams and produce the third stream > >> if the condition is met. I implemented CoFlatMapFunction and tried to > >> monitor a state using a field in the implemented class (I want to > >> isolate my application from the checkpointing feature, and therefore I > >> do not use the states as documented here > >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/st > >> ate .html). The field served as a flag indicating whether there are some > >> pending data from either input stream, and if yes, processing it along > >> with the arriving data from the other input stream (the processing > >> invokes a native function). > >> > >> But then I got double free error and segmentation fault, which I believe > >> was due to unintentional concurrent access to the native function. Then > >> I tried to wrap the access into a synchronized method, as well as > >> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain > >> and the error remained. > >> > >> I considered using CoProcessFunction in my case, but seems to me that it > >> does not handle customary internal states, stating in the javadoc "The > >> context [CoProcessFunction.Context] is only valid during the invocation > >> of this method, do not store it." > >> > >> > >> > >> Thanks, > >> Chao
signature.asc
Description: This is a digitally signed message part.