fapaul commented on code in PR #154:
URL: 
https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2031443184


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kafka.sink.KafkaWriterState;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Describes the ownership model of transactional ids and with that ownership 
of the transactions.
+ *
+ * <p>A subtask that owns a transactional id is responsible for committing and 
aborting the
+ * transactions having that id. Only that subtask may create new ids.
+ *
+ * <p>Transactional ids have the form <code>transactionalIdPrefix + "-" + 
subtaskId + "-" + counter
+ * </code>. The prefix is given by the user, the subtask id is defined through 
the ownership model
+ * and the counter through the {@link
+ * org.apache.flink.connector.kafka.sink.TransactionNamingStrategy}.
+ *
+ * <p>For all strategies ownership is extrapolated for subtask ids beyond the 
currently known
+ * subtasks. This is necessary to support cases of intermediate upscaling 
where no checkpoint has
+ * been taken. Consider an application that runs with 3 subtasks and 
checkpointed. Later, its
+ * upscaled to 5 but then a failure happens. We need to have at least 5 open 
transactions. If the
+ * application is finally resumed from the checkpoint with 3 subtasks again. 
These 3 subtasks need
+ * to assume ownership of the remaining 2.
+ */
+@Internal
+public enum TransactionOwnership {
+    /**
+     * The ownership is determined by the current subtask ID. Ownership is 
extrapolated by
+     * extracting the original subtask id of the ongoing transactions and 
applying modulo on the
+     * current parallelism.
+     */
+    IMPLICIT_BY_SUBTASK_ID {
+        @Override
+        public int[] getOwnedSubtaskIds(
+                int currentSubtaskId,
+                int currentParallelism,
+                Collection<KafkaWriterState> recoveredStates) {
+            if (!recoveredStates.isEmpty()) {
+                checkForMigration(currentSubtaskId, recoveredStates);
+            }
+
+            return new int[] {currentSubtaskId};
+        }
+
+        private void checkForMigration(
+                int currentSubtaskId, Collection<KafkaWriterState> 
recoveredStates) {
+            TransactionOwnership oldOwnership =
+                    recoveredStates.stream()
+                            .map(KafkaWriterState::getTransactionOwnership)
+                            .findFirst()
+                            .orElseThrow();
+            if (oldOwnership == this) {
+                return;
+            }
+
+            Set<Integer> ownedSubtaskIds =
+                    recoveredStates.stream()
+                            .mapToInt(KafkaWriterState::getOwnedSubtaskId)
+                            .boxed()
+                            .collect(Collectors.toSet());
+            if (!ownedSubtaskIds.contains(currentSubtaskId)
+                    && !ownedSubtaskIds.equals(Set.of(UNKNOWN))) {
+                int numShares =
+                        recoveredStates.stream()
+                                .mapToInt(KafkaWriterState::getOwnedSubtaskId)
+                                .findFirst()
+                                .orElse(UNKNOWN);
+                throw new IllegalStateException(
+                        "Attempted to switch back to INCREMENTING from a 
transaction naming strategy that uses the new writer state. A possible way to 
safely do it is to scale back to the maximum parallelism of "
+                                + numShares);
+            }
+        }
+
+        @Override
+        public int getTotalNumberOfOwnedSubtasks(
+                int currentSubtaskId,
+                int currentParallelism,
+                Collection<KafkaWriterState> recoveredStates) {
+            return currentParallelism;
+        }
+    },
+    /**
+     * The ownership is determined by the writer state that is recovered. Each 
writer may have
+     * multiple states each with a different subtask id (e.g. when 
downscaling).
+     *
+     * <p>Additionally, the maximum parallelism that has been observed is 
stored in the state and
+     * used to extrapolate ownership.
+     *
+     * <p>This ownership model has two assumption of the state assignment:
+     *
+     * <ul>
+     *   <li>State is assigned first to lower subtask ids and then to higher 
ones. In the upscaling
+     *       case, from oldP to newP, only tasks [0; oldP) are assigned. 
[oldP; newP) are not
+     *       assigned any state.
+     *   <li>State is uniformly assigned. In the upscaling case, none of the 
tasks have more than 1
+     *       state assigned.
+     * </ul>
+     *
+     * <p>Hence, the state is consecutively assigned to the subtasks from low 
to high.
+     *
+     * <p>With these assumption, this ownership model is able to recover from 
writer states with
+     * subtask id + max parallelism:
+     *
+     * <ul>
+     *   <li>If there is state, we can extract the owned subtask ids and the 
max parallelism from
+     *       the state.
+     *   <li>If there is no state, we can use the current subtask id and the 
max parallelism from
+     *       the current parallelism. The current subtask id cannot possibly 
be owned already. The
+     *       max parallelism in any state must be lower than the current max 
parallelism.
+     *   <li>Hence, no subtask id is owned by more than one task and all tasks 
have the same max
+     *       parallelism.
+     *   <li>Since all tasks have shared knowledge, we can exclusively assign 
all transactional ids.
+     *   <li>Since each subtask owns at least one transactional id, we can 
safely create new
+     *       transactional ids while other subtasks are still aborting their 
transactions.
+     * </ul>
+     */
+    EXPLICIT_BY_WRITER_STATE {
+        @Override
+        public int[] getOwnedSubtaskIds(
+                int currentSubtaskId,
+                int currentParallelism,
+                Collection<KafkaWriterState> recoveredStates) {
+            if (recoveredStates.isEmpty()) {
+                return new int[] {currentSubtaskId};
+            } else {
+                int[] ownedSubtaskIds =
+                        recoveredStates.stream()
+                                .mapToInt(KafkaWriterState::getOwnedSubtaskId)
+                                .sorted()
+                                .toArray();
+                assertKnown(ownedSubtaskIds[0]);
+
+                int maxParallelism =
+                        
recoveredStates.iterator().next().getTotalNumberOfOwnedSubtasks();

Review Comment:
   Nit: Should we add a simple check that all writes are expected to have the 
same number?
   
   The naming is also slightly confusing since the `TransactionOwnership` and 
the state implement the same method.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionOwnership.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kafka.sink.KafkaWriterState;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.kafka.sink.KafkaWriterState.UNKNOWN;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Describes the ownership model of transactional ids and with that ownership 
of the transactions.
+ *
+ * <p>A subtask that owns a transactional id is responsible for committing and 
aborting the
+ * transactions having that id. Only that subtask may create new ids.
+ *
+ * <p>Transactional ids have the form <code>transactionalIdPrefix + "-" + 
subtaskId + "-" + counter
+ * </code>. The prefix is given by the user, the subtask id is defined through 
the ownership model
+ * and the counter through the {@link
+ * org.apache.flink.connector.kafka.sink.TransactionNamingStrategy}.
+ *
+ * <p>For all strategies ownership is extrapolated for subtask ids beyond the 
currently known
+ * subtasks. This is necessary to support cases of intermediate upscaling 
where no checkpoint has
+ * been taken. Consider an application that runs with 3 subtasks and 
checkpointed. Later, its
+ * upscaled to 5 but then a failure happens. We need to have at least 5 open 
transactions. If the
+ * application is finally resumed from the checkpoint with 3 subtasks again. 
These 3 subtasks need
+ * to assume ownership of the remaining 2.
+ */
+@Internal
+public enum TransactionOwnership {
+    /**
+     * The ownership is determined by the current subtask ID. Ownership is 
extrapolated by
+     * extracting the original subtask id of the ongoing transactions and 
applying modulo on the
+     * current parallelism.
+     */
+    IMPLICIT_BY_SUBTASK_ID {
+        @Override
+        public int[] getOwnedSubtaskIds(
+                int currentSubtaskId,
+                int currentParallelism,
+                Collection<KafkaWriterState> recoveredStates) {
+            if (!recoveredStates.isEmpty()) {
+                checkForMigration(currentSubtaskId, recoveredStates);
+            }
+
+            return new int[] {currentSubtaskId};
+        }
+
+        private void checkForMigration(
+                int currentSubtaskId, Collection<KafkaWriterState> 
recoveredStates) {
+            TransactionOwnership oldOwnership =
+                    recoveredStates.stream()
+                            .map(KafkaWriterState::getTransactionOwnership)
+                            .findFirst()
+                            .orElseThrow();
+            if (oldOwnership == this) {
+                return;
+            }
+
+            Set<Integer> ownedSubtaskIds =
+                    recoveredStates.stream()
+                            .mapToInt(KafkaWriterState::getOwnedSubtaskId)
+                            .boxed()
+                            .collect(Collectors.toSet());
+            if (!ownedSubtaskIds.contains(currentSubtaskId)
+                    && !ownedSubtaskIds.equals(Set.of(UNKNOWN))) {
+                int numShares =
+                        recoveredStates.stream()
+                                .mapToInt(KafkaWriterState::getOwnedSubtaskId)
+                                .findFirst()
+                                .orElse(UNKNOWN);
+                throw new IllegalStateException(
+                        "Attempted to switch back to INCREMENTING from a 
transaction naming strategy that uses the new writer state. A possible way to 
safely do it is to scale back to the maximum parallelism of "
+                                + numShares);
+            }
+        }
+
+        @Override
+        public int getTotalNumberOfOwnedSubtasks(
+                int currentSubtaskId,
+                int currentParallelism,
+                Collection<KafkaWriterState> recoveredStates) {
+            return currentParallelism;
+        }
+    },
+    /**
+     * The ownership is determined by the writer state that is recovered. Each 
writer may have
+     * multiple states each with a different subtask id (e.g. when 
downscaling).
+     *
+     * <p>Additionally, the maximum parallelism that has been observed is 
stored in the state and
+     * used to extrapolate ownership.
+     *
+     * <p>This ownership model has two assumption of the state assignment:
+     *
+     * <ul>
+     *   <li>State is assigned first to lower subtask ids and then to higher 
ones. In the upscaling
+     *       case, from oldP to newP, only tasks [0; oldP) are assigned. 
[oldP; newP) are not
+     *       assigned any state.
+     *   <li>State is uniformly assigned. In the upscaling case, none of the 
tasks have more than 1
+     *       state assigned.
+     * </ul>
+     *
+     * <p>Hence, the state is consecutively assigned to the subtasks from low 
to high.
+     *
+     * <p>With these assumption, this ownership model is able to recover from 
writer states with
+     * subtask id + max parallelism:
+     *
+     * <ul>
+     *   <li>If there is state, we can extract the owned subtask ids and the 
max parallelism from
+     *       the state.
+     *   <li>If there is no state, we can use the current subtask id and the 
max parallelism from
+     *       the current parallelism. The current subtask id cannot possibly 
be owned already. The
+     *       max parallelism in any state must be lower than the current max 
parallelism.
+     *   <li>Hence, no subtask id is owned by more than one task and all tasks 
have the same max
+     *       parallelism.
+     *   <li>Since all tasks have shared knowledge, we can exclusively assign 
all transactional ids.
+     *   <li>Since each subtask owns at least one transactional id, we can 
safely create new
+     *       transactional ids while other subtasks are still aborting their 
transactions.
+     * </ul>
+     */
+    EXPLICIT_BY_WRITER_STATE {
+        @Override
+        public int[] getOwnedSubtaskIds(
+                int currentSubtaskId,
+                int currentParallelism,
+                Collection<KafkaWriterState> recoveredStates) {
+            if (recoveredStates.isEmpty()) {
+                return new int[] {currentSubtaskId};
+            } else {
+                int[] ownedSubtaskIds =
+                        recoveredStates.stream()
+                                .mapToInt(KafkaWriterState::getOwnedSubtaskId)
+                                .sorted()
+                                .toArray();
+                assertKnown(ownedSubtaskIds[0]);
+
+                int maxParallelism =
+                        
recoveredStates.iterator().next().getTotalNumberOfOwnedSubtasks();
+                // Assumption of the ownership model: state is distributed 
consecutively across the
+                // subtasks starting with subtask 0
+                checkState(currentSubtaskId < maxParallelism, "State not 
consecutively assigned");
+
+                return ownedSubtaskIds;
+            }
+        }
+
+        @Override
+        public int getTotalNumberOfOwnedSubtasks(
+                int currentSubtaskId,
+                int currentParallelism,
+                Collection<KafkaWriterState> recoveredStates) {
+            if (recoveredStates.isEmpty()) {
+                return currentParallelism;
+            }
+            int totalNumberOfOwnedSubtasks =
+                    
recoveredStates.iterator().next().getTotalNumberOfOwnedSubtasks();
+            assertKnown(totalNumberOfOwnedSubtasks);
+
+            if (currentParallelism >= totalNumberOfOwnedSubtasks) {
+                // Assumption of the ownership model: state is distributed 
consecutively across the
+                // subtasks starting with subtask 0
+                checkState(recoveredStates.size() == 1, "Not uniformly 
assigned");

Review Comment:
   Is this an upscale scenario? 
   
   Shouldn't the state assignment be 0 or 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to