fapaul commented on code in PR #25456: URL: https://github.com/apache/flink/pull/25456#discussion_r1803152409
########## flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java: ########## @@ -47,10 +47,50 @@ import java.util.Collections; import java.util.List; +import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.util.Preconditions.checkNotNull; -class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamOperator<Void> - implements OneInputStreamOperator<CommittableMessage<CommT>, Void>, BoundedOneInput { +/** + * Implements the {@code GlobalCommitter}. + * + * <p>This operator usually trails behind a {@code CommitterOperator}. In this case, the global + * committer will receive committables from the committer operator through {@link + * #processElement(StreamRecord)}. Once all committables from all subtasks have been received, the + * global committer will commit them. + * + * <p>That means that the global committer will not wait for {@link + * #notifyCheckpointComplete(long)}. In many cases, it receives the callback before the actual + * committables anyway. So it would effectively globally commit one checkpoint later. + * + * <p>However, we can leverage the following observation: the global committer will only receive + * committables iff the respective checkpoint was completed and upstream committers received the + * {@link #notifyCheckpointComplete(long)}. So by waiting for all committables of a given + * checkpoint, we implicitly know that the checkpoint was successful and the global committer is + * supposed to globally commit. + * + * <p>Note that committables of checkpoint X are not checkpointed in X because the global committer + * is trailing behind the checkpoint. They are replayed from the committer state in case of an + * error. The state only includes incomplete checkpoints coming from upstream committers not + * receiving {@link #notifyCheckpointComplete(long)}. All committables received are successful. + * + * <p>In rare cases, the GlobalCommitterOperator may be connected to a writer directly. In this Review Comment: In theory, the global committer can be at any point of the custom topology hooks behind some custom operator. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java: ########## @@ -127,38 +165,67 @@ public void initializeState(StateInitializationContext context) throws Exception }); lastCompletedCheckpointId = context.getRestoredCheckpointId().getAsLong(); // try to re-commit recovered transactions as quickly as possible - commit(lastCompletedCheckpointId); + commit(); } } + private SimpleVersionedSerializer<GlobalCommittableWrapper<CommT, GlobalCommT>> + getCommitterStateSerializer() { + final CommittableCollectorSerializer<CommT> committableCollectorSerializer = + new CommittableCollectorSerializer<>( + committableSerializer, + getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), + getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(), + metricGroup); + return new GlobalCommitterSerializer<>( + committableCollectorSerializer, globalCommittableSerializer, metricGroup); + } + @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); - commit(lastCompletedCheckpointId); - } - - private void commit(long checkpointId) throws IOException, InterruptedException { - for (CheckpointCommittableManager<CommT> checkpoint : - committableCollector.getCheckpointCommittablesUpTo(checkpointId)) { - checkpoint.commit(committer); + if (!commitOnInput) { + lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); Review Comment: Nit: Let's move the calculation of `lastCompletedCheckpointId` into the `commit()` method and add a parameter to the `commit()` method for the current checkpoint. ########## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java: ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.translators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.graph.TransformationTranslator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.transformations.GlobalCommitterTransform; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.PhysicalTransformation; +import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; +import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Queue; +import java.util.Set; + +import static org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME; + +/** + * A {@link TransformationTranslator} for the {@link GlobalCommitterOperator}. The main purpose is + * to detect whether we set {@link GlobalCommitterOperator#commitOnInput} or not. + */ +@Internal +public class GlobalCommitterTransformationTranslator<CommT> + implements TransformationTranslator<Void, GlobalCommitterTransform<CommT>> { + + @Override + public Collection<Integer> translateForBatch( + GlobalCommitterTransform<CommT> transformation, Context context) { + return translateInternal(transformation, true); + } + + @Override + public Collection<Integer> translateForStreaming( + GlobalCommitterTransform<CommT> transformation, Context context) { + return translateInternal(transformation, false); + } + + private Collection<Integer> translateInternal( + GlobalCommitterTransform<CommT> globalCommitterTransform, boolean batch) { + DataStream<CommittableMessage<CommT>> inputStream = + globalCommitterTransform.getInputStream(); + boolean checkpointingEnabled = + inputStream + .getExecutionEnvironment() + .getCheckpointConfig() + .isCheckpointingEnabled(); + boolean commitOnInput = batch || !checkpointingEnabled || hasUpstreamCommitter(inputStream); + + // Create a global shuffle and add the global committer with parallelism 1. + final PhysicalTransformation<Void> transformation = + (PhysicalTransformation<Void>) + inputStream + .global() + .transform( + GLOBAL_COMMITTER_TRANSFORMATION_NAME, + Types.VOID, + new GlobalCommitterOperator<>( + globalCommitterTransform.getCommitterFactory(), + globalCommitterTransform.getCommittableSerializer(), + commitOnInput)) + .getTransformation(); + transformation.setChainingStrategy(ChainingStrategy.ALWAYS); + transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME); + transformation.setParallelism(1); + transformation.setMaxParallelism(1); + return Collections.emptyList(); + } + + /** + * Looks for a committer in the pipeline and aborts on writer. The GlobalCommitter behaves + * differently if there is a committer after the writer. + */ + private static boolean hasUpstreamCommitter(DataStream<?> ds) { Review Comment: How safe is this really? Users could have used the global committer with different custom operators in front of that follow either writer or committer semantics. ########## flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java: ########## @@ -168,13 +169,33 @@ public Collection<CommittableWithLineage<CommT>> commit(Committer<CommT> committ return committed; } - Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean onlyIfFullyReceived) { + Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean assertFull) { return subtasksCommittableManagers.values().stream() - .filter(subtask -> !onlyIfFullyReceived || subtask.hasReceivedAll()) + .peek(subtask -> assertReceivedAll(assertFull, subtask)) .flatMap(SubtaskCommittableManager::getPendingRequests) .collect(Collectors.toList()); } + /** + * Sinks don't use unaligned checkpoints, so we receive all committables of a given upstream Review Comment: This javadoc is now a bit confusing since global committer and committer follow different committing and receival strategies. ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java: ########## @@ -127,38 +165,67 @@ public void initializeState(StateInitializationContext context) throws Exception }); lastCompletedCheckpointId = context.getRestoredCheckpointId().getAsLong(); // try to re-commit recovered transactions as quickly as possible - commit(lastCompletedCheckpointId); + commit(); } } + private SimpleVersionedSerializer<GlobalCommittableWrapper<CommT, GlobalCommT>> + getCommitterStateSerializer() { + final CommittableCollectorSerializer<CommT> committableCollectorSerializer = + new CommittableCollectorSerializer<>( + committableSerializer, + getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), + getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(), + metricGroup); + return new GlobalCommitterSerializer<>( + committableCollectorSerializer, globalCommittableSerializer, metricGroup); + } + @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { super.notifyCheckpointComplete(checkpointId); - lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); - commit(lastCompletedCheckpointId); - } - - private void commit(long checkpointId) throws IOException, InterruptedException { - for (CheckpointCommittableManager<CommT> checkpoint : - committableCollector.getCheckpointCommittablesUpTo(checkpointId)) { - checkpoint.commit(committer); + if (!commitOnInput) { + lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId, checkpointId); + commit(); } - committableCollector.compact(); } - @Override - public void endInput() throws Exception { - final CheckpointCommittableManager<CommT> endOfInputCommittable = - committableCollector.getEndOfInputCommittable(); - if (endOfInputCommittable != null) { - do { - endOfInputCommittable.commit(committer); - } while (!committableCollector.isFinished()); - } + private void commit() throws IOException, InterruptedException { + // this is true for the last commit and we need to make sure that all committables are + // indeed committed as this function will never be invoked again + boolean waitForAllCommitted = + lastCompletedCheckpointId == EOI Review Comment: Can the following scenario happen? On `EOI` and `commitOnInput=True`, we start running an infinite loop because we have not received all committables for EOI from the upstream task (committer) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org