[ https://issues.apache.org/jira/browse/FLINK-10840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681395#comment-16681395 ]
Chesnay Schepler commented on FLINK-10840: ------------------------------------------ [~yanghua] are you asking whether {code}bucketState.pendingFiles = new ArrayList<>();{code} modifies the collection passed in {code}bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles);{code}? > BucketingSink incorrectly clears the pendingFiles List > ------------------------------------------------------ > > Key: FLINK-10840 > URL: https://issues.apache.org/jira/browse/FLINK-10840 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Reporter: vinoyang > Assignee: vinoyang > Priority: Major > > BucketingSink#snapshotState : (see the *comment* in this method) > {code:java} > public void snapshotState(FunctionSnapshotContext context) throws Exception { > Preconditions.checkNotNull(restoredBucketStates, "The operator has not > been properly initialized."); > restoredBucketStates.clear(); > synchronized (state.bucketStates) { > int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); > for (Map.Entry<String, BucketState<T>> bucketStateEntry : > state.bucketStates.entrySet()) { > BucketState<T> bucketState = bucketStateEntry.getValue(); > if (bucketState.isWriterOpen) { > bucketState.currentFileValidLength = bucketState.writer.flush(); > } > synchronized (bucketState.pendingFilesPerCheckpoint) { > > bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), > bucketState.pendingFiles); > } > //This operation will make this collection prematurely emptied > bucketState.pendingFiles = new ArrayList<>(); > } > restoredBucketStates.add(state); > if (LOG.isDebugEnabled()) { > LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), > subtaskIdx, state); > } > } > } > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)