gaoyunhaii commented on a change in pull request #13740: URL: https://github.com/apache/flink/pull/13740#discussion_r516044747
########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java ########## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; + +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A bucket is the directory organization of the output of the {@link FileSink}. + * + * <p>For each incoming element in the {@code FileSink}, the user-specified + * {@link BucketAssigner} is queried to see in which bucket this element should be written to. + */ +@Internal +class FileWriterBucket<IN, BucketID> { + + private static final Logger LOG = LoggerFactory.getLogger(FileWriterBucket.class); + + private final BucketID bucketId; + + private final Path bucketPath; + + private final String uniqueId; + + private final BucketWriter<IN, BucketID> bucketWriter; + + private final RollingPolicy<IN, BucketID> rollingPolicy; + + private final OutputFileConfig outputFileConfig; + + private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles = new ArrayList<>(); + + private long partCounter; + + @Nullable + private InProgressFileRecoverable inProgressFileToCleanup; + + @Nullable + private InProgressFileWriter<IN, BucketID> inProgressPart; + + /** + * Constructor to create a new empty bucket. + */ + private FileWriterBucket( + BucketID bucketId, + Path bucketPath, + String uniqueId, + BucketWriter<IN, BucketID> bucketWriter, + RollingPolicy<IN, BucketID> rollingPolicy, + OutputFileConfig outputFileConfig) { + this.bucketId = checkNotNull(bucketId); + this.bucketPath = checkNotNull(bucketPath); + this.uniqueId = checkNotNull(uniqueId); Review comment: Currently we did have no requirements for set it directly. I move the parameter to be initialized inside the `FileWriterBucket`. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java ########## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; + +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A bucket is the directory organization of the output of the {@link FileSink}. + * + * <p>For each incoming element in the {@code FileSink}, the user-specified + * {@link BucketAssigner} is queried to see in which bucket this element should be written to. + */ +@Internal +class FileWriterBucket<IN, BucketID> { + + private static final Logger LOG = LoggerFactory.getLogger(FileWriterBucket.class); + + private final BucketID bucketId; + + private final Path bucketPath; + + private final String uniqueId; + + private final BucketWriter<IN, BucketID> bucketWriter; + + private final RollingPolicy<IN, BucketID> rollingPolicy; + + private final OutputFileConfig outputFileConfig; + + private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles = new ArrayList<>(); + + private long partCounter; + + @Nullable + private InProgressFileRecoverable inProgressFileToCleanup; + + @Nullable + private InProgressFileWriter<IN, BucketID> inProgressPart; + + /** + * Constructor to create a new empty bucket. + */ + private FileWriterBucket( + BucketID bucketId, + Path bucketPath, + String uniqueId, + BucketWriter<IN, BucketID> bucketWriter, + RollingPolicy<IN, BucketID> rollingPolicy, + OutputFileConfig outputFileConfig) { + this.bucketId = checkNotNull(bucketId); + this.bucketPath = checkNotNull(bucketPath); + this.uniqueId = checkNotNull(uniqueId); Review comment: Currently we did have no requirements for set it directly. I moved the parameter to be initialized inside the `FileWriterBucket`. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java ########## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.writer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; + +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A bucket is the directory organization of the output of the {@link FileSink}. + * + * <p>For each incoming element in the {@code FileSink}, the user-specified + * {@link BucketAssigner} is queried to see in which bucket this element should be written to. + */ +@Internal +class FileWriterBucket<IN, BucketID> { + + private static final Logger LOG = LoggerFactory.getLogger(FileWriterBucket.class); + + private final BucketID bucketId; + + private final Path bucketPath; + + private final String uniqueId; + + private final BucketWriter<IN, BucketID> bucketWriter; + + private final RollingPolicy<IN, BucketID> rollingPolicy; + + private final OutputFileConfig outputFileConfig; + + private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles = new ArrayList<>(); + + private long partCounter; + + @Nullable + private InProgressFileRecoverable inProgressFileToCleanup; + + @Nullable + private InProgressFileWriter<IN, BucketID> inProgressPart; + + /** + * Constructor to create a new empty bucket. + */ + private FileWriterBucket( + BucketID bucketId, + Path bucketPath, + String uniqueId, + BucketWriter<IN, BucketID> bucketWriter, + RollingPolicy<IN, BucketID> rollingPolicy, + OutputFileConfig outputFileConfig) { + this.bucketId = checkNotNull(bucketId); + this.bucketPath = checkNotNull(bucketPath); + this.uniqueId = checkNotNull(uniqueId); + this.bucketWriter = checkNotNull(bucketWriter); + this.rollingPolicy = checkNotNull(rollingPolicy); + this.outputFileConfig = checkNotNull(outputFileConfig); + + this.partCounter = 0; + } + + /** + * Constructor to restore a bucket from checkpointed state. + */ + private FileWriterBucket( + String uniqueId, + BucketWriter<IN, BucketID> partFileFactory, + RollingPolicy<IN, BucketID> rollingPolicy, + FileWriterBucketState<BucketID> bucketState, + OutputFileConfig outputFileConfig) throws IOException { + + this( + bucketState.getBucketId(), + bucketState.getBucketPath(), + uniqueId, + partFileFactory, + rollingPolicy, + outputFileConfig); + + restoreInProgressFile(bucketState); + } + + private void restoreInProgressFile(FileWriterBucketState<BucketID> state) throws IOException { + if (!state.hasInProgressFileRecoverable()) { + return; + } + + // we try to resume the previous in-progress file + InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = + state.getInProgressFileRecoverable(); + + if (bucketWriter.getProperties().supportsResume()) { + inProgressPart = bucketWriter.resumeInProgressFileFrom( + bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime()); + } else { + pendingFiles.add(inProgressFileRecoverable); + } + } + + public BucketID getBucketId() { + return bucketId; + } + + public Path getBucketPath() { + return bucketPath; + } + + public long getPartCounter() { + return partCounter; + } + + public boolean isActive() { + return inProgressPart != null || inProgressFileToCleanup != null || pendingFiles.size() > 0; + } + + void merge(final FileWriterBucket<IN, BucketID> bucket) throws IOException { + checkNotNull(bucket); + checkState(Objects.equals(bucket.bucketPath, bucketPath)); + + bucket.closePartFile(); + pendingFiles.addAll(bucket.pendingFiles); + + if (LOG.isDebugEnabled()) { + LOG.debug("Merging buckets for bucket id={}", bucketId); + } + } + + void write(IN element) throws IOException { + long now = System.currentTimeMillis(); + if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "closing in-progress part file for bucket id={} due to element {}.", + bucketId, + element); + } + + inProgressPart = rollPartFile(now); + } + + inProgressPart.write(element, now); + } + + List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException { + if (inProgressPart != null && (rollingPolicy.shouldRollOnCheckpoint(inProgressPart) + || flush)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Closing in-progress part file for bucket id={} on checkpoint.", + bucketId); + } + closePartFile(); + } + + List<FileSinkCommittable> committables = new ArrayList<>(); + pendingFiles.forEach(pendingFile -> committables.add(new FileSinkCommittable(pendingFile))); + pendingFiles.clear(); + + if (inProgressFileToCleanup != null) { + committables.add(new FileSinkCommittable(inProgressFileToCleanup)); + inProgressFileToCleanup = null; + } + + return committables; + } + + FileWriterBucketState<BucketID> snapshotState() throws IOException { + InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null; + long inProgressFileCreationTime = Long.MAX_VALUE; + + if (inProgressPart != null) { + inProgressFileRecoverable = inProgressPart.persist(); + inProgressFileToCleanup = inProgressFileRecoverable; + inProgressFileCreationTime = inProgressPart.getCreationTime(); + } + + return new FileWriterBucketState<>( + bucketId, + bucketPath, + inProgressFileCreationTime, + inProgressFileRecoverable); + } + + private InProgressFileWriter<IN, BucketID> rollPartFile(long currentTime) throws IOException { + closePartFile(); + + final Path partFilePath = assembleNewPartPath(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Opening new part file \"{}\" for bucket id={}.", + partFilePath.getName(), bucketId); + } + + return bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime); + } + + /** + * Constructor a new PartPath and increment the partCounter. + */ + private Path assembleNewPartPath() { + long currentPartCounter = partCounter++; + return new Path( + bucketPath, + outputFileConfig.getPartPrefix() + '-' + uniqueId + '-' + currentPartCounter + + outputFileConfig.getPartSuffix()); + } + + private void closePartFile() throws IOException { + if (inProgressPart != null) { + InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = inProgressPart.closeForCommit(); + pendingFiles.add(pendingFileRecoverable); + inProgressPart = null; + } + } + + void disposePartFile() { + if (inProgressPart != null) { + inProgressPart.dispose(); + } + } + + // --------------------------- Testing Methods ----------------------------- + + @VisibleForTesting + public String getUniqueId() { + return uniqueId; + } + + @Nullable + @VisibleForTesting + InProgressFileWriter<IN, BucketID> getInProgressPart() { + return inProgressPart; + } + Review comment: Added `@VisibleForTesting` for all the testing methods. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.writer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink.Writer; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Writer implementation for {@link FileSink}. + */ +public class FileWriter<IN, BucketID> + implements Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> { + + private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class); + + // ------------------------ configuration fields -------------------------- + + private final Path basePath; + + private final FileWriterBucketFactory<IN, BucketID> bucketFactory; + + private final BucketAssigner<IN, BucketID> bucketAssigner; + + private final BucketWriter<IN, BucketID> bucketWriter; + + private final RollingPolicy<IN, BucketID> rollingPolicy; + + // --------------------------- runtime fields ----------------------------- + + private final BucketerContext bucketerContext; + + private final Map<BucketID, FileWriterBucket<IN, BucketID>> activeBuckets; + + private final OutputFileConfig outputFileConfig; + + // --------------------------- State Related Fields ----------------------------- + + private final FileWriterBucketStateSerializer<BucketID> bucketStateSerializer; + + /** + * A constructor creating a new empty bucket manager. + * + * @param basePath The base path for our buckets. + * @param bucketAssigner The {@link BucketAssigner} provided by the user. + * @param bucketFactory The {@link FileWriterBucketFactory} to be used to create buckets. + * @param bucketWriter The {@link BucketWriter} to be used when writing data. + * @param rollingPolicy The {@link RollingPolicy} as specified by the user. + */ + public FileWriter( + final Path basePath, + final BucketAssigner<IN, BucketID> bucketAssigner, + final FileWriterBucketFactory<IN, BucketID> bucketFactory, + final BucketWriter<IN, BucketID> bucketWriter, + final RollingPolicy<IN, BucketID> rollingPolicy, + final OutputFileConfig outputFileConfig) { + + this.basePath = checkNotNull(basePath); + this.bucketAssigner = checkNotNull(bucketAssigner); + this.bucketFactory = checkNotNull(bucketFactory); + this.bucketWriter = checkNotNull(bucketWriter); + this.rollingPolicy = checkNotNull(rollingPolicy); + + this.outputFileConfig = checkNotNull(outputFileConfig); + + this.activeBuckets = new HashMap<>(); + this.bucketerContext = new BucketerContext(); + + this.bucketStateSerializer = new FileWriterBucketStateSerializer<>( + bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), + bucketAssigner.getSerializer()); + } + + /** + * Initializes the state after recovery from a failure. + * + * <p>During this process: + * <ol> + * <li>we set the initial value for part counter to the maximum value used before across all tasks and buckets. + * This guarantees that we do not overwrite valid data,</li> + * <li>we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),</li> + * <li>we resume writing to the previous in-progress file of each bucket, and</li> + * <li>if we receive multiple states for the same bucket, we merge them.</li> + * </ol> + * + * @param bucketStates the state holding recovered state about active buckets. + * + * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any + * in-progress/pending part files + */ + public void initializeState(List<FileWriterBucketState<BucketID>> bucketStates) throws IOException { + for (FileWriterBucketState<BucketID> state : bucketStates) { + BucketID bucketId = state.getBucketId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring: {}", state); + } + + FileWriterBucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket( + bucketWriter, + rollingPolicy, + state, + outputFileConfig); + + updateActiveBucketId(bucketId, restoredBucket); + } + } + + private void updateActiveBucketId( + BucketID bucketId, + FileWriterBucket<IN, BucketID> restoredBucket) throws IOException { + final FileWriterBucket<IN, BucketID> bucket = activeBuckets.get(bucketId); + if (bucket != null) { + bucket.merge(restoredBucket); + } else { + activeBuckets.put(bucketId, restoredBucket); + } + } + + @Override + public void write(IN element, Context context) throws IOException { + // setting the values in the bucketer context + bucketerContext.update( + context.timestamp(), + context.currentWatermark()); + + final BucketID bucketId = bucketAssigner.getBucketId(element, bucketerContext); + final FileWriterBucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId); + bucket.write(element); + } + + @Override + public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException { + List<FileSinkCommittable> committables = new ArrayList<>(); + + // Every time before we prepare commit, we first check and remove the inactive + // buckets. Checking the activeness right before pre-committing avoid re-creating + // the bucket every time if the bucket use OnCheckpointingRollingPolicy. + Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> activeBucketIt = + activeBuckets.entrySet().iterator(); + while (activeBucketIt.hasNext()) { Review comment: One concern for the bucket that only have pending files is that we check it after sending the commits, it would always be inactive, then we will need to remove it and add a new one in the next checkpoint period.Do you think this would be a problem ? If we delay the checking for one checkpoint period, we would only remove the bucket if it does not receive records for one checkpoint period. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.committer; + +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Committer implementation for {@link FileSink}. + */ +public class FileCommitter implements Committer<FileSinkCommittable> { + private static final Logger LOG = LoggerFactory.getLogger(FileCommitter.class); + + private final BucketWriter<?, ?> bucketWriter; + + public FileCommitter(BucketWriter<?, ?> bucketWriter) { + this.bucketWriter = checkNotNull(bucketWriter); + } + + @Override + public List<FileSinkCommittable> commit(List<FileSinkCommittable> committables) { + List<FileSinkCommittable> needRetry = new ArrayList<>(); + for (FileSinkCommittable committable : committables) { + if (committable.hasPendingFile()) { + // We should always use commitAfterRecovery which contains additional checks. + try { + bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery(); + } catch (IOException e) { + LOG.error("Failed to commit {}", committable.getPendingFile()); + needRetry.add(committable); Review comment: Agree with that we should not use retry now, I changed to return the empty list for now. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.committer; + +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Committer implementation for {@link FileSink}. + */ +public class FileCommitter implements Committer<FileSinkCommittable> { + private static final Logger LOG = LoggerFactory.getLogger(FileCommitter.class); + + private final BucketWriter<?, ?> bucketWriter; + + public FileCommitter(BucketWriter<?, ?> bucketWriter) { + this.bucketWriter = checkNotNull(bucketWriter); + } + + @Override + public List<FileSinkCommittable> commit(List<FileSinkCommittable> committables) { + List<FileSinkCommittable> needRetry = new ArrayList<>(); + for (FileSinkCommittable committable : committables) { + if (committable.hasPendingFile()) { + // We should always use commitAfterRecovery which contains additional checks. + try { + bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery(); + } catch (IOException e) { + LOG.error("Failed to commit {}", committable.getPendingFile()); + needRetry.add(committable); Review comment: Agree with that we should not use retry now, I changed to return the empty list for now. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.Writer; +import org.apache.flink.connector.file.sink.committer.FileCommitter; +import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory; +import org.apache.flink.connector.file.sink.writer.FileWriter; +import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory; +import org.apache.flink.connector.file.sink.writer.FileWriterBucketState; +import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; +import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A unified sink for both streaming and blocking mode, based on the new Sink API. + */ +@Experimental +public class FileSink<IN, BucketID> + implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> { + + private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder; + + private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) { + this.bucketsBuilder = checkNotNull(bucketsBuilder); + } + + @Override + public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter( + InitContext context, + List<FileWriterBucketState<BucketID>> states) throws IOException { + FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context); + writer.initializeState(states); + return writer; + } + + @Override + public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer() + throws IOException { + return Optional.of(bucketsBuilder.getWriterStateSerializer()); + } + + @Override + public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException { + return Optional.of(bucketsBuilder.createCommitter()); + } + + @Override + public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer() + throws IOException { + return Optional.of(bucketsBuilder.getCommittableSerializer()); + } + + @Override + public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() { + return Optional.empty(); + } + + public static <IN> DefaultRowFormatBuilder<IN> forRowFormat( + final Path basePath, final Encoder<IN> encoder) { + return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>()); + } + + /** + * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}. + */ + @Internal + private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>> + implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L; + + @SuppressWarnings("unchecked") + protected T self() { + return (T) this; + } + + @Internal + protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException; + + @Internal + protected abstract FileCommitter createCommitter() throws IOException; + + @Internal + protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException; + + @Internal + protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException; + } + + /** + * A builder for configuring the sink for row-wise encoding formats. + */ + public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>> + extends BucketsBuilder<IN, BucketID, T> { + + private static final long serialVersionUID = 1L; + + private final Path basePath; + + private final Encoder<IN> encoder; + + private BucketAssigner<IN, BucketID> bucketAssigner; + + private RollingPolicy<IN, BucketID> rollingPolicy; + + private FileWriterBucketFactory<IN, BucketID> bucketFactory; + + private OutputFileConfig outputFileConfig; + + protected RowFormatBuilder( + Path basePath, + Encoder<IN> encoder, + BucketAssigner<IN, BucketID> bucketAssigner) { + this( + basePath, + encoder, + bucketAssigner, + DefaultRollingPolicy.builder().build(), + new DefaultFileWriterBucketFactory<>(), + OutputFileConfig.builder().build()); + } + + protected RowFormatBuilder( + Path basePath, + Encoder<IN> encoder, + BucketAssigner<IN, BucketID> assigner, + RollingPolicy<IN, BucketID> policy, + FileWriterBucketFactory<IN, BucketID> bucketFactory, + OutputFileConfig outputFileConfig) { + this.basePath = checkNotNull(basePath); + this.encoder = checkNotNull(encoder); + this.bucketAssigner = checkNotNull(assigner); + this.rollingPolicy = checkNotNull(policy); + this.bucketFactory = checkNotNull(bucketFactory); + this.outputFileConfig = checkNotNull(outputFileConfig); + } + + public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) { + this.bucketAssigner = checkNotNull(assigner); + return self(); + } + + public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) { + this.rollingPolicy = checkNotNull(policy); + return self(); + } + + public T withOutputFileConfig(final OutputFileConfig outputFileConfig) { + this.outputFileConfig = outputFileConfig; + return self(); + } + + public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy( + BucketAssigner<IN, BucketID> assigner, + RollingPolicy<IN, BucketID> policy) { + Preconditions.checkState( + bucketFactory.getClass() == DefaultFileWriterBucketFactory.class, + "newBuilderWithBucketAssignerAndPolicy() cannot be called " + + "after specifying a customized bucket factory"); + return new RowFormatBuilder<>( + basePath, + encoder, + checkNotNull(assigner), + checkNotNull(policy), + bucketFactory, + outputFileConfig); + } + + @VisibleForTesting + T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) { + this.bucketFactory = Preconditions.checkNotNull(factory); + return self(); + } + + /** Creates the actual sink. */ + public FileSink<IN, BucketID> build() { + return new FileSink<>(this); + } + + @Override + public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException { Review comment: It seems we could not change the modifier since the method is inherited from an interface~? ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java ########## @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.Writer; +import org.apache.flink.connector.file.sink.committer.FileCommitter; +import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory; +import org.apache.flink.connector.file.sink.writer.FileWriter; +import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory; +import org.apache.flink.connector.file.sink.writer.FileWriterBucketState; +import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; +import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A unified sink for both streaming and blocking mode, based on the new Sink API. + */ +@Experimental +public class FileSink<IN, BucketID> + implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> { + + private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder; + + private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) { + this.bucketsBuilder = checkNotNull(bucketsBuilder); + } + + @Override + public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter( + InitContext context, + List<FileWriterBucketState<BucketID>> states) throws IOException { + FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context); + writer.initializeState(states); + return writer; + } + + @Override + public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer() + throws IOException { + return Optional.of(bucketsBuilder.getWriterStateSerializer()); + } + + @Override + public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException { + return Optional.of(bucketsBuilder.createCommitter()); + } + + @Override + public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer() + throws IOException { + return Optional.of(bucketsBuilder.getCommittableSerializer()); + } + + @Override + public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() { + return Optional.empty(); + } + + @Override + public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() { + return Optional.empty(); + } + + public static <IN> DefaultRowFormatBuilder<IN> forRowFormat( + final Path basePath, final Encoder<IN> encoder) { + return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>()); + } + + /** + * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}. + */ + @Internal + private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>> + implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L; + + @SuppressWarnings("unchecked") + protected T self() { + return (T) this; + } + + @Internal + protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException; + + @Internal + protected abstract FileCommitter createCommitter() throws IOException; + + @Internal + protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException; + + @Internal + protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException; + } + + /** + * A builder for configuring the sink for row-wise encoding formats. + */ + public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>> + extends BucketsBuilder<IN, BucketID, T> { + + private static final long serialVersionUID = 1L; + + private final Path basePath; + + private final Encoder<IN> encoder; + + private BucketAssigner<IN, BucketID> bucketAssigner; + + private RollingPolicy<IN, BucketID> rollingPolicy; + + private FileWriterBucketFactory<IN, BucketID> bucketFactory; + + private OutputFileConfig outputFileConfig; + + protected RowFormatBuilder( + Path basePath, + Encoder<IN> encoder, + BucketAssigner<IN, BucketID> bucketAssigner) { + this( + basePath, + encoder, + bucketAssigner, + DefaultRollingPolicy.builder().build(), + new DefaultFileWriterBucketFactory<>(), + OutputFileConfig.builder().build()); + } + + protected RowFormatBuilder( + Path basePath, + Encoder<IN> encoder, + BucketAssigner<IN, BucketID> assigner, + RollingPolicy<IN, BucketID> policy, + FileWriterBucketFactory<IN, BucketID> bucketFactory, + OutputFileConfig outputFileConfig) { + this.basePath = checkNotNull(basePath); + this.encoder = checkNotNull(encoder); + this.bucketAssigner = checkNotNull(assigner); + this.rollingPolicy = checkNotNull(policy); + this.bucketFactory = checkNotNull(bucketFactory); + this.outputFileConfig = checkNotNull(outputFileConfig); + } + + public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) { + this.bucketAssigner = checkNotNull(assigner); + return self(); + } + + public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) { + this.rollingPolicy = checkNotNull(policy); + return self(); + } + + public T withOutputFileConfig(final OutputFileConfig outputFileConfig) { + this.outputFileConfig = outputFileConfig; + return self(); + } + + public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy( + BucketAssigner<IN, BucketID> assigner, + RollingPolicy<IN, BucketID> policy) { + Preconditions.checkState( + bucketFactory.getClass() == DefaultFileWriterBucketFactory.class, + "newBuilderWithBucketAssignerAndPolicy() cannot be called " + + "after specifying a customized bucket factory"); + return new RowFormatBuilder<>( + basePath, + encoder, + checkNotNull(assigner), + checkNotNull(policy), + bucketFactory, + outputFileConfig); + } + + @VisibleForTesting + T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) { + this.bucketFactory = Preconditions.checkNotNull(factory); + return self(); + } + + /** Creates the actual sink. */ + public FileSink<IN, BucketID> build() { + return new FileSink<>(this); + } + + @Override + public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException { Review comment: Yes, indeed, I changed the modifier to `package-private`. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.writer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink.Writer; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Writer implementation for {@link FileSink}. + */ +public class FileWriter<IN, BucketID> + implements Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> { + + private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class); + + // ------------------------ configuration fields -------------------------- + + private final Path basePath; + + private final FileWriterBucketFactory<IN, BucketID> bucketFactory; + + private final BucketAssigner<IN, BucketID> bucketAssigner; + + private final BucketWriter<IN, BucketID> bucketWriter; + + private final RollingPolicy<IN, BucketID> rollingPolicy; + + // --------------------------- runtime fields ----------------------------- + + private final BucketerContext bucketerContext; + + private final Map<BucketID, FileWriterBucket<IN, BucketID>> activeBuckets; + + private final OutputFileConfig outputFileConfig; + + // --------------------------- State Related Fields ----------------------------- + + private final FileWriterBucketStateSerializer<BucketID> bucketStateSerializer; + + /** + * A constructor creating a new empty bucket manager. + * + * @param basePath The base path for our buckets. + * @param bucketAssigner The {@link BucketAssigner} provided by the user. + * @param bucketFactory The {@link FileWriterBucketFactory} to be used to create buckets. + * @param bucketWriter The {@link BucketWriter} to be used when writing data. + * @param rollingPolicy The {@link RollingPolicy} as specified by the user. + */ + public FileWriter( + final Path basePath, + final BucketAssigner<IN, BucketID> bucketAssigner, + final FileWriterBucketFactory<IN, BucketID> bucketFactory, + final BucketWriter<IN, BucketID> bucketWriter, + final RollingPolicy<IN, BucketID> rollingPolicy, + final OutputFileConfig outputFileConfig) { + + this.basePath = checkNotNull(basePath); + this.bucketAssigner = checkNotNull(bucketAssigner); + this.bucketFactory = checkNotNull(bucketFactory); + this.bucketWriter = checkNotNull(bucketWriter); + this.rollingPolicy = checkNotNull(rollingPolicy); + + this.outputFileConfig = checkNotNull(outputFileConfig); + + this.activeBuckets = new HashMap<>(); + this.bucketerContext = new BucketerContext(); + + this.bucketStateSerializer = new FileWriterBucketStateSerializer<>( + bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), + bucketAssigner.getSerializer()); + } + + /** + * Initializes the state after recovery from a failure. + * + * <p>During this process: + * <ol> + * <li>we set the initial value for part counter to the maximum value used before across all tasks and buckets. + * This guarantees that we do not overwrite valid data,</li> + * <li>we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),</li> + * <li>we resume writing to the previous in-progress file of each bucket, and</li> + * <li>if we receive multiple states for the same bucket, we merge them.</li> + * </ol> + * + * @param bucketStates the state holding recovered state about active buckets. + * + * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any + * in-progress/pending part files + */ + public void initializeState(List<FileWriterBucketState<BucketID>> bucketStates) throws IOException { + for (FileWriterBucketState<BucketID> state : bucketStates) { + BucketID bucketId = state.getBucketId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring: {}", state); + } + + FileWriterBucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket( + bucketWriter, + rollingPolicy, + state, + outputFileConfig); + + updateActiveBucketId(bucketId, restoredBucket); + } + } + + private void updateActiveBucketId( + BucketID bucketId, + FileWriterBucket<IN, BucketID> restoredBucket) throws IOException { + final FileWriterBucket<IN, BucketID> bucket = activeBuckets.get(bucketId); + if (bucket != null) { + bucket.merge(restoredBucket); + } else { + activeBuckets.put(bucketId, restoredBucket); + } + } + + @Override + public void write(IN element, Context context) throws IOException { + // setting the values in the bucketer context + bucketerContext.update( + context.timestamp(), + context.currentWatermark()); + + final BucketID bucketId = bucketAssigner.getBucketId(element, bucketerContext); + final FileWriterBucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId); + bucket.write(element); + } + + @Override + public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException { + List<FileSinkCommittable> committables = new ArrayList<>(); + + // Every time before we prepare commit, we first check and remove the inactive + // buckets. Checking the activeness right before pre-committing avoid re-creating + // the bucket every time if the bucket use OnCheckpointingRollingPolicy. + Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> activeBucketIt = + activeBuckets.entrySet().iterator(); + while (activeBucketIt.hasNext()) { Review comment: No, not this issue, I referred to the issue that we might need to recreate the bucket on each checkpoint period if `OnCheckpointRollingPolicy` is used and we detecting the inactiveness after send the committables. In this case the bucket will always be detected to be inactive since all committables are sent out right before the detection. ########## File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java ########## @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.sink.writer; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink.Writer; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.sink.FileSinkCommittable; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; +import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Writer implementation for {@link FileSink}. + */ +public class FileWriter<IN, BucketID> + implements Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> { + + private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class); + + // ------------------------ configuration fields -------------------------- + + private final Path basePath; + + private final FileWriterBucketFactory<IN, BucketID> bucketFactory; + + private final BucketAssigner<IN, BucketID> bucketAssigner; + + private final BucketWriter<IN, BucketID> bucketWriter; + + private final RollingPolicy<IN, BucketID> rollingPolicy; + + // --------------------------- runtime fields ----------------------------- + + private final BucketerContext bucketerContext; + + private final Map<BucketID, FileWriterBucket<IN, BucketID>> activeBuckets; + + private final OutputFileConfig outputFileConfig; + + // --------------------------- State Related Fields ----------------------------- + + private final FileWriterBucketStateSerializer<BucketID> bucketStateSerializer; + + /** + * A constructor creating a new empty bucket manager. + * + * @param basePath The base path for our buckets. + * @param bucketAssigner The {@link BucketAssigner} provided by the user. + * @param bucketFactory The {@link FileWriterBucketFactory} to be used to create buckets. + * @param bucketWriter The {@link BucketWriter} to be used when writing data. + * @param rollingPolicy The {@link RollingPolicy} as specified by the user. + */ + public FileWriter( + final Path basePath, + final BucketAssigner<IN, BucketID> bucketAssigner, + final FileWriterBucketFactory<IN, BucketID> bucketFactory, + final BucketWriter<IN, BucketID> bucketWriter, + final RollingPolicy<IN, BucketID> rollingPolicy, + final OutputFileConfig outputFileConfig) { + + this.basePath = checkNotNull(basePath); + this.bucketAssigner = checkNotNull(bucketAssigner); + this.bucketFactory = checkNotNull(bucketFactory); + this.bucketWriter = checkNotNull(bucketWriter); + this.rollingPolicy = checkNotNull(rollingPolicy); + + this.outputFileConfig = checkNotNull(outputFileConfig); + + this.activeBuckets = new HashMap<>(); + this.bucketerContext = new BucketerContext(); + + this.bucketStateSerializer = new FileWriterBucketStateSerializer<>( + bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), + bucketAssigner.getSerializer()); + } + + /** + * Initializes the state after recovery from a failure. + * + * <p>During this process: + * <ol> + * <li>we set the initial value for part counter to the maximum value used before across all tasks and buckets. + * This guarantees that we do not overwrite valid data,</li> + * <li>we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),</li> + * <li>we resume writing to the previous in-progress file of each bucket, and</li> + * <li>if we receive multiple states for the same bucket, we merge them.</li> + * </ol> + * + * @param bucketStates the state holding recovered state about active buckets. + * + * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any + * in-progress/pending part files + */ + public void initializeState(List<FileWriterBucketState<BucketID>> bucketStates) throws IOException { + for (FileWriterBucketState<BucketID> state : bucketStates) { + BucketID bucketId = state.getBucketId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring: {}", state); + } + + FileWriterBucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket( + bucketWriter, + rollingPolicy, + state, + outputFileConfig); + + updateActiveBucketId(bucketId, restoredBucket); + } + } + + private void updateActiveBucketId( + BucketID bucketId, + FileWriterBucket<IN, BucketID> restoredBucket) throws IOException { + final FileWriterBucket<IN, BucketID> bucket = activeBuckets.get(bucketId); + if (bucket != null) { + bucket.merge(restoredBucket); + } else { + activeBuckets.put(bucketId, restoredBucket); + } + } + + @Override + public void write(IN element, Context context) throws IOException { + // setting the values in the bucketer context + bucketerContext.update( + context.timestamp(), + context.currentWatermark()); + + final BucketID bucketId = bucketAssigner.getBucketId(element, bucketerContext); + final FileWriterBucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId); + bucket.write(element); + } + + @Override + public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException { + List<FileSinkCommittable> committables = new ArrayList<>(); + + // Every time before we prepare commit, we first check and remove the inactive + // buckets. Checking the activeness right before pre-committing avoid re-creating + // the bucket every time if the bucket use OnCheckpointingRollingPolicy. + Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> activeBucketIt = + activeBuckets.entrySet().iterator(); + while (activeBucketIt.hasNext()) { Review comment: No, not this issue, I mean the issue that we might need to recreate the bucket on each checkpoint period if `OnCheckpointRollingPolicy` is used and we detecting the inactiveness after send the committables. In this case the bucket will always be detected to be inactive since all committables are sent out right before the detection. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org