[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636006#comment-15636006 ]
ASF GitHub Bot commented on FLINK-4391: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86524130 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java --- @@ -540,15 +540,12 @@ private boolean performCheckpoint(CheckpointMetaData checkpointMetaData) throws synchronized (lock) { if (isRunning) { + checkpointState(checkpointMetaData); - // Since both state checkpointing and downstream barrier emission occurs in this - // lock scope, they are an atomic operation regardless of the order in which they occur. - // Given this, we immediately emit the checkpoint barriers, so the downstream operators - // can start their checkpoint work as soon as possible + // broadcast barriers after snapshot operators' states. operatorChain.broadcastCheckpointBarrier( - checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp()); - - checkpointState(checkpointMetaData); + checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp() + ); --- End diff -- I think the `ReentrantReadWriteLock` could work. However, I'm not so sure whether the higher costs of this lock compared to a mutual exclusion lock we're currently using is worth the change. I fear that we're optimising here for the case where you have a long chain of `AsyncWaitOperators`. Instead we could simply disallow chaining for these operators. Then every chain would have at most 2 write threads (main and `Emitter`) competing for the lock. Thus, I would vote for using the existing mutual exclusion lock instead. > Provide support for asynchronous operations over streams > -------------------------------------------------------- > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Jamie Grier > Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)