[ https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15822346#comment-15822346 ]
Ted Yu edited comment on FLINK-5486 at 4/12/17 4:25 PM: -------------------------------------------------------- Lock on State.bucketStates should be held in the following method: {code} private void handleRestoredBucketState(State<T> restoredState) { Preconditions.checkNotNull(restoredState); for (BucketState<T> bucketState : restoredState.bucketStates.values()) { {code} was (Author: yuzhih...@gmail.com): Lock on State.bucketStates should be held in the following method: {code} private void handleRestoredBucketState(State<T> restoredState) { Preconditions.checkNotNull(restoredState); for (BucketState<T> bucketState : restoredState.bucketStates.values()) { {code} > Lack of synchronization in BucketingSink#handleRestoredBucketState() > -------------------------------------------------------------------- > > Key: FLINK-5486 > URL: https://issues.apache.org/jira/browse/FLINK-5486 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Reporter: Ted Yu > > Here is related code: > {code} > > handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint); > synchronized (bucketState.pendingFilesPerCheckpoint) { > bucketState.pendingFilesPerCheckpoint.clear(); > } > {code} > The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside > the synchronization block. Otherwise during the processing of > handlePendingFilesForPreviousCheckpoints(), some entries of the map may be > cleared. -- This message was sent by Atlassian JIRA (v6.3.15#6346)