vinoyang created FLINK-10840:
--------------------------------

             Summary: BucketingSink incorrectly clears the pendingFiles List
                 Key: FLINK-10840
                 URL: https://issues.apache.org/jira/browse/FLINK-10840
             Project: Flink
          Issue Type: Bug
          Components: Streaming Connectors
            Reporter: vinoyang
            Assignee: vinoyang


BucketingSink#snapshotState : 
{code:java}
public void snapshotState(FunctionSnapshotContext context) throws Exception {
   Preconditions.checkNotNull(restoredBucketStates, "The operator has not been 
properly initialized.");

   restoredBucketStates.clear();

   synchronized (state.bucketStates) {
      int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();

      for (Map.Entry<String, BucketState<T>> bucketStateEntry : 
state.bucketStates.entrySet()) {
         BucketState<T> bucketState = bucketStateEntry.getValue();

         if (bucketState.isWriterOpen) {
            bucketState.currentFileValidLength = bucketState.writer.flush();
         }

         synchronized (bucketState.pendingFilesPerCheckpoint) {
            
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), 
bucketState.pendingFiles);
         }
         //This operation will make this collection prematurely emptied
         bucketState.pendingFiles = new ArrayList<>();
      }
      restoredBucketStates.add(state);

      if (LOG.isDebugEnabled()) {
         LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), 
subtaskIdx, state);
      }
   }
}
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to