Thank you! Nico. That helps me a lot!

2a) That really clarifies my understanding about Flink. Yes, I think I have used static references, since I invoked a native function (implemented through JNI) which I believe only has one instance per process. And I guess the reason why those Java synchronization mechanisms were in vain is because of separate function objects at runtime, which results in separate lock objects. Now I use c++ mutex within the native function and it resolves my case.

BTW, could you elaborate a bit more about what do you mean by "per-record base"? what do you mean by a record?

3) I do not intend to store the CoProcessFunction.Context. I was just wondering that since the document said it is only valid during the invocation, for maintaining custom states of my program logic I guess I cannot use it.


Thank you,
Chao


On 08/16/2017 03:31 AM, Nico Kruber wrote:
Hi Chao,

1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me
quote the javadoc of the CoProcessFunction:

"Contrary to the {@link CoFlatMapFunction}, this function can also query the
time (both event and processing) and set timers, through the provided {@link
Context}. When reacting to the firing of set timers the function can emit yet
more elements."

So, imho, both deliver a different level of abstraction and control (high- vs.
low-level). Also note the different methods available for you to implement.

2a) In general, Flink calls functions on a per-record base in a serialized
fashion per task. For each task at a TaskManager, in case of it having
multiple slots, separate function objects are used where you should only get
in trouble if you share static references. Otherwise you do not need to worry
about thread-safety.

2b) From what I see in the code (StreamTwoInputProcessor), the same should
apply to CoFlatMapFunction and CoProcessFunction so that calls to flatMap1/2
and processElement1/2 are not called in parallel!

3) why would you want to store the CoProcessFunction.Context?


Nico

On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:
Hi,

I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
and to what extent? What's the difference between the two Functions? and
in general, how does Flink prevent race conditions? Here's my case:

I tried to condition on two input streams and produce the third stream
if the condition is met. I implemented CoFlatMapFunction and tried to
monitor a state using a field in the implemented class (I want to
isolate my application from the checkpointing feature, and therefore I
do not use the states as documented here
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state
.html). The field served as a flag indicating whether there are some pending
data from either input stream, and if yes, processing it along with the
arriving data from the other input stream (the processing invokes a native
function).

But then I got double free error and segmentation fault, which I believe
was due to unintentional concurrent access to the native function. Then
I tried to wrap the access into a synchronized method, as well as
explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
and the error remained.

I considered using CoProcessFunction in my case, but seems to me that it
does not handle customary internal states, stating in the javadoc "The
context [CoProcessFunction.Context] is only valid during the invocation
of this method, do not store it."



Thanks,
Chao

Reply via email to