[ 
https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15642997#comment-15642997
 ] 

ASF GitHub Bot commented on FLINK-4391:
---------------------------------------

Github user bjlovegithub commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2629#discussion_r86709933
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
    @@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData 
checkpointMetaData) throws
     
                synchronized (lock) {
                        if (isRunning) {
    +                           checkpointState(checkpointMetaData);
     
    -                           // Since both state checkpointing and 
downstream barrier emission occurs in this
    -                           // lock scope, they are an atomic operation 
regardless of the order in which they occur.
    -                           // Given this, we immediately emit the 
checkpoint barriers, so the downstream operators
    -                           // can start their checkpoint work as soon as 
possible
    +                           // broadcast barriers after snapshot operators' 
states.
                                operatorChain.broadcastCheckpointBarrier(
    -                                           
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp());
    -
    -                           checkpointState(checkpointMetaData);
    +                                           
checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()
    +                           );
    --- End diff --
    
    Can you share the test program and test result? I write an sample program 
to test the performance to acquire and release lock with `Object` / 
`ReentrantLock` / `ReentrantReadWriteLock`, finding that for one thread to use 
the lock, they have similar results. But referring to multiple thread cases, 
`ReentrantLock` and ` ReentrantReadWriteLock` excels `Object` lock.
    Here is my benchmark program: 
[link](https://github.com/bjlovegithub/JavaLockTest/blob/master/src/LockTest.java)
    And this is the sampled results run on my laptop: 
[stat](https://github.com/bjlovegithub/JavaLockTest/blob/master/stat/LockTest_result.data)


> Provide support for asynchronous operations over streams
> --------------------------------------------------------
>
>                 Key: FLINK-4391
>                 URL: https://issues.apache.org/jira/browse/FLINK-4391
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Jamie Grier
>            Assignee: david.wang
>
> Many Flink users need to do asynchronous processing driven by data from a 
> DataStream.  The classic example would be joining against an external 
> database in order to enrich a stream with extra information.
> It would be nice to add general support for this type of operation in the 
> Flink API.  Ideally this could simply take the form of a new operator that 
> manages async operations, keeps so many of them in flight, and then emits 
> results to downstream operators as the async operations complete.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to