vinoyang created FLINK-10840: -------------------------------- Summary: BucketingSink incorrectly clears the pendingFiles List Key: FLINK-10840 URL: https://issues.apache.org/jira/browse/FLINK-10840 Project: Flink Issue Type: Bug Components: Streaming Connectors Reporter: vinoyang Assignee: vinoyang
BucketingSink#snapshotState : {code:java} public void snapshotState(FunctionSnapshotContext context) throws Exception { Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized."); restoredBucketStates.clear(); synchronized (state.bucketStates) { int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) { BucketState<T> bucketState = bucketStateEntry.getValue(); if (bucketState.isWriterOpen) { bucketState.currentFileValidLength = bucketState.writer.flush(); } synchronized (bucketState.pendingFilesPerCheckpoint) { bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles); } //This operation will make this collection prematurely emptied bucketState.pendingFiles = new ArrayList<>(); } restoredBucketStates.add(state); if (LOG.isDebugEnabled()) { LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state); } } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)