fapaul commented on code in PR #154: URL: https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2031443184
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java: ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kafka.sink.KafkaWriterState; + +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Describes the ownership model of transactional ids and with that ownership of the transactions. + * + * <p>A subtask that owns a transactional id is responsible for committing and aborting the + * transactions having that id. Only that subtask may create new ids. + * + * <p>Transactional ids have the form <code>transactionalIdPrefix + "-" + subtaskId + "-" + counter + * </code>. The prefix is given by the user, the subtask id is defined through the ownership model + * and the counter through the {@link + * org.apache.flink.connector.kafka.sink.TransactionNamingStrategy}. + * + * <p>For all strategies ownership is extrapolated for subtask ids beyond the currently known + * subtasks. This is necessary to support cases of intermediate upscaling where no checkpoint has + * been taken. Consider an application that runs with 3 subtasks and checkpointed. Later, its + * upscaled to 5 but then a failure happens. We need to have at least 5 open transactions. If the + * application is finally resumed from the checkpoint with 3 subtasks again. These 3 subtasks need + * to assume ownership of the remaining 2. + */ +@Internal +public enum TransactionOwnership { + /** + * The ownership is determined by the current subtask ID. Ownership is extrapolated by + * extracting the original subtask id of the ongoing transactions and applying modulo on the + * current parallelism. + */ + IMPLICIT_BY_SUBTASK_ID { + @Override + public int[] getOwnedSubtaskIds( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates) { + if (!recoveredStates.isEmpty()) { + checkForMigration(currentSubtaskId, recoveredStates); + } + + return new int[] {currentSubtaskId}; + } + + private void checkForMigration( + int currentSubtaskId, Collection<KafkaWriterState> recoveredStates) { + TransactionOwnership oldOwnership = + recoveredStates.stream() + .map(KafkaWriterState::getTransactionOwnership) + .findFirst() + .orElseThrow(); + if (oldOwnership == this) { + return; + } + + Set<Integer> ownedSubtaskIds = + recoveredStates.stream() + .mapToInt(KafkaWriterState::getOwnedSubtaskId) + .boxed() + .collect(Collectors.toSet()); + if (!ownedSubtaskIds.contains(currentSubtaskId) + && !ownedSubtaskIds.equals(Set.of(UNKNOWN))) { + int numShares = + recoveredStates.stream() + .mapToInt(KafkaWriterState::getOwnedSubtaskId) + .findFirst() + .orElse(UNKNOWN); + throw new IllegalStateException( + "Attempted to switch back to INCREMENTING from a transaction naming strategy that uses the new writer state. A possible way to safely do it is to scale back to the maximum parallelism of " + + numShares); + } + } + + @Override + public int getTotalNumberOfOwnedSubtasks( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates) { + return currentParallelism; + } + }, + /** + * The ownership is determined by the writer state that is recovered. Each writer may have + * multiple states each with a different subtask id (e.g. when downscaling). + * + * <p>Additionally, the maximum parallelism that has been observed is stored in the state and + * used to extrapolate ownership. + * + * <p>This ownership model has two assumption of the state assignment: + * + * <ul> + * <li>State is assigned first to lower subtask ids and then to higher ones. In the upscaling + * case, from oldP to newP, only tasks [0; oldP) are assigned. [oldP; newP) are not + * assigned any state. + * <li>State is uniformly assigned. In the upscaling case, none of the tasks have more than 1 + * state assigned. + * </ul> + * + * <p>Hence, the state is consecutively assigned to the subtasks from low to high. + * + * <p>With these assumption, this ownership model is able to recover from writer states with + * subtask id + max parallelism: + * + * <ul> + * <li>If there is state, we can extract the owned subtask ids and the max parallelism from + * the state. + * <li>If there is no state, we can use the current subtask id and the max parallelism from + * the current parallelism. The current subtask id cannot possibly be owned already. The + * max parallelism in any state must be lower than the current max parallelism. + * <li>Hence, no subtask id is owned by more than one task and all tasks have the same max + * parallelism. + * <li>Since all tasks have shared knowledge, we can exclusively assign all transactional ids. + * <li>Since each subtask owns at least one transactional id, we can safely create new + * transactional ids while other subtasks are still aborting their transactions. + * </ul> + */ + EXPLICIT_BY_WRITER_STATE { + @Override + public int[] getOwnedSubtaskIds( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates) { + if (recoveredStates.isEmpty()) { + return new int[] {currentSubtaskId}; + } else { + int[] ownedSubtaskIds = + recoveredStates.stream() + .mapToInt(KafkaWriterState::getOwnedSubtaskId) + .sorted() + .toArray(); + assertKnown(ownedSubtaskIds[0]); + + int maxParallelism = + recoveredStates.iterator().next().getTotalNumberOfOwnedSubtasks(); Review Comment: Nit: Should we add a simple check that all writes are expected to have the same number? The naming is also slightly confusing since the `TransactionOwnership` and the state implement the same method. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java: ########## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.kafka.sink.KafkaWriterState; + +import java.util.Collection; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Describes the ownership model of transactional ids and with that ownership of the transactions. + * + * <p>A subtask that owns a transactional id is responsible for committing and aborting the + * transactions having that id. Only that subtask may create new ids. + * + * <p>Transactional ids have the form <code>transactionalIdPrefix + "-" + subtaskId + "-" + counter + * </code>. The prefix is given by the user, the subtask id is defined through the ownership model + * and the counter through the {@link + * org.apache.flink.connector.kafka.sink.TransactionNamingStrategy}. + * + * <p>For all strategies ownership is extrapolated for subtask ids beyond the currently known + * subtasks. This is necessary to support cases of intermediate upscaling where no checkpoint has + * been taken. Consider an application that runs with 3 subtasks and checkpointed. Later, its + * upscaled to 5 but then a failure happens. We need to have at least 5 open transactions. If the + * application is finally resumed from the checkpoint with 3 subtasks again. These 3 subtasks need + * to assume ownership of the remaining 2. + */ +@Internal +public enum TransactionOwnership { + /** + * The ownership is determined by the current subtask ID. Ownership is extrapolated by + * extracting the original subtask id of the ongoing transactions and applying modulo on the + * current parallelism. + */ + IMPLICIT_BY_SUBTASK_ID { + @Override + public int[] getOwnedSubtaskIds( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates) { + if (!recoveredStates.isEmpty()) { + checkForMigration(currentSubtaskId, recoveredStates); + } + + return new int[] {currentSubtaskId}; + } + + private void checkForMigration( + int currentSubtaskId, Collection<KafkaWriterState> recoveredStates) { + TransactionOwnership oldOwnership = + recoveredStates.stream() + .map(KafkaWriterState::getTransactionOwnership) + .findFirst() + .orElseThrow(); + if (oldOwnership == this) { + return; + } + + Set<Integer> ownedSubtaskIds = + recoveredStates.stream() + .mapToInt(KafkaWriterState::getOwnedSubtaskId) + .boxed() + .collect(Collectors.toSet()); + if (!ownedSubtaskIds.contains(currentSubtaskId) + && !ownedSubtaskIds.equals(Set.of(UNKNOWN))) { + int numShares = + recoveredStates.stream() + .mapToInt(KafkaWriterState::getOwnedSubtaskId) + .findFirst() + .orElse(UNKNOWN); + throw new IllegalStateException( + "Attempted to switch back to INCREMENTING from a transaction naming strategy that uses the new writer state. A possible way to safely do it is to scale back to the maximum parallelism of " + + numShares); + } + } + + @Override + public int getTotalNumberOfOwnedSubtasks( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates) { + return currentParallelism; + } + }, + /** + * The ownership is determined by the writer state that is recovered. Each writer may have + * multiple states each with a different subtask id (e.g. when downscaling). + * + * <p>Additionally, the maximum parallelism that has been observed is stored in the state and + * used to extrapolate ownership. + * + * <p>This ownership model has two assumption of the state assignment: + * + * <ul> + * <li>State is assigned first to lower subtask ids and then to higher ones. In the upscaling + * case, from oldP to newP, only tasks [0; oldP) are assigned. [oldP; newP) are not + * assigned any state. + * <li>State is uniformly assigned. In the upscaling case, none of the tasks have more than 1 + * state assigned. + * </ul> + * + * <p>Hence, the state is consecutively assigned to the subtasks from low to high. + * + * <p>With these assumption, this ownership model is able to recover from writer states with + * subtask id + max parallelism: + * + * <ul> + * <li>If there is state, we can extract the owned subtask ids and the max parallelism from + * the state. + * <li>If there is no state, we can use the current subtask id and the max parallelism from + * the current parallelism. The current subtask id cannot possibly be owned already. The + * max parallelism in any state must be lower than the current max parallelism. + * <li>Hence, no subtask id is owned by more than one task and all tasks have the same max + * parallelism. + * <li>Since all tasks have shared knowledge, we can exclusively assign all transactional ids. + * <li>Since each subtask owns at least one transactional id, we can safely create new + * transactional ids while other subtasks are still aborting their transactions. + * </ul> + */ + EXPLICIT_BY_WRITER_STATE { + @Override + public int[] getOwnedSubtaskIds( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates) { + if (recoveredStates.isEmpty()) { + return new int[] {currentSubtaskId}; + } else { + int[] ownedSubtaskIds = + recoveredStates.stream() + .mapToInt(KafkaWriterState::getOwnedSubtaskId) + .sorted() + .toArray(); + assertKnown(ownedSubtaskIds[0]); + + int maxParallelism = + recoveredStates.iterator().next().getTotalNumberOfOwnedSubtasks(); + // Assumption of the ownership model: state is distributed consecutively across the + // subtasks starting with subtask 0 + checkState(currentSubtaskId < maxParallelism, "State not consecutively assigned"); + + return ownedSubtaskIds; + } + } + + @Override + public int getTotalNumberOfOwnedSubtasks( + int currentSubtaskId, + int currentParallelism, + Collection<KafkaWriterState> recoveredStates) { + if (recoveredStates.isEmpty()) { + return currentParallelism; + } + int totalNumberOfOwnedSubtasks = + recoveredStates.iterator().next().getTotalNumberOfOwnedSubtasks(); + assertKnown(totalNumberOfOwnedSubtasks); + + if (currentParallelism >= totalNumberOfOwnedSubtasks) { + // Assumption of the ownership model: state is distributed consecutively across the + // subtasks starting with subtask 0 + checkState(recoveredStates.size() == 1, "Not uniformly assigned"); Review Comment: Is this an upscale scenario? Shouldn't the state assignment be 0 or 1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org