fapaul commented on code in PR #154:
URL: 
https://github.com/apache/flink-connector-kafka/pull/154#discussion_r1979035891


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java:
##########
@@ -47,8 +54,15 @@ public byte[] serialize(KafkaWriterState state) throws 
IOException {
     public KafkaWriterState deserialize(int version, byte[] serialized) throws 
IOException {
         try (final ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized);
                 final DataInputStream in = new DataInputStream(bais)) {
-            final String transactionalIdPrefx = in.readUTF();
-            return new KafkaWriterState(transactionalIdPrefx);
+            final String transactionalIdPrefix = in.readUTF();
+            final Collection<CheckpointTransaction> ongoingTransactions = new 
ArrayList<>();
+            if (version >= 2) {

Review Comment:
   Nit: Isn't it better to assert on version 2 and fail for other versions?



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java:
##########
@@ -53,6 +55,20 @@ public FlinkKafkaInternalProducer<byte[], byte[]> 
getTransactionalProducer(
             }
             return 
context.getProducer(context.buildTransactionalId(expectedCheckpointId));
         }
+    },
+    POOLING {
+        @Override
+        public FlinkKafkaInternalProducer<byte[], byte[]> 
getTransactionalProducer(
+                Context context) {
+            Collection<String> usedTransactionalIds = 
context.getOngoingTransactions();
+            for (int offset = 0; ; offset++) {
+                String transactionalIdCandidate = 
context.buildTransactionalId(offset);
+                if (usedTransactionalIds.contains(transactionalIdCandidate)) {
+                    continue;
+                }
+                return context.getProducer(transactionalIdCandidate);

Review Comment:
   Do we have a log line somewhere that indicates how big the pool is? Since it 
might be an indicator of an serious issues we should probably log on every new 
producer creation.



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkOptions.java:
##########
@@ -72,7 +72,15 @@ public enum TransactionNamingStrategy {
          * <p>This is exactly the same behavior as in flink-connector-kafka 
3.X.
          */
         INCREMENTING(
-                TransactionNamingStrategyImpl.INCREMENTING, 
TransactionAbortStrategyImpl.PROBING);
+                TransactionNamingStrategyImpl.INCREMENTING, 
TransactionAbortStrategyImpl.PROBING),
+        /**
+         * This strategy reuses transaction names. It is more 
resource-friendly than {@link
+         * #INCREMENTING} on the Kafka broker.
+         *
+         * <p>It's a new strategy introduced in flink-connector-kafka 4.X. It 
requires Kafka 3.0+
+         * and additional permissions.

Review Comment:
   Can we list the additional operations e.g. calls to the admin client. 



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java:
##########
@@ -107,6 +119,64 @@ private int abortTransactionOfSubtask(String prefix, int 
subtaskId, Context cont
             }
             return numTransactionAborted;
         }
+    },
+    LISTING {
+        private final EnumSet<TransactionState> abortableStates =
+                EnumSet.complementOf(
+                        EnumSet.of(
+                                TransactionState.COMPLETE_ABORT,
+                                TransactionState.COMPLETE_COMMIT,
+                                TransactionState.PREPARE_COMMIT));
+
+        @Override
+        public void abortTransactions(Context context) {
+
+            Collection<TransactionListing> openTransactionsForTopics =
+                    AdminUtils.getOpenTransactionsForTopics(
+                            context.getAdminClient(), context.getTopicNames());
+
+            // This list could be huge if 
TransactionNamingStrategy#INCREMENTING is used, so cap it
+            LOG.trace(

Review Comment:
   Let's warn the user how many transactions will be aborted to spot unwanted 
behavior earlier



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -264,11 +282,51 @@ private TransactionAbortStrategyContextImpl 
getTransactionAbortStrategyContext(
                     producerPool.recycle(producer);
                     return epoch;
                 };
+        Set<String> ongoingTransactionIds =
+                recoveredStates.stream()
+                        .flatMap(
+                                s ->
+                                        s.getOngoingTransactions().stream()
+                                                
.map(CheckpointTransaction::getTransactionalId))
+                        .collect(Collectors.toSet());
         return new TransactionAbortStrategyContextImpl(
+                this::getTopicNames,
                 kafkaSinkContext.getParallelInstanceId(),
                 kafkaSinkContext.getNumberOfParallelInstances(),
                 prefixesToAbort,
                 startCheckpointId,
-                aborter);
+                aborter,
+                this::getAdminClient,
+                ongoingTransactionIds);
+    }
+
+    private Collection<String> getTopicNames() {
+        KafkaDatasetIdentifier identifier =
+                getDatasetIdentifier()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "The record serializer does 
not expose a static list of target topics."));
+        if (identifier.getTopics() != null) {
+            return identifier.getTopics();
+        }
+        return AdminUtils.getTopicsByPattern(getAdminClient(), 
identifier.getTopicPattern());
+    }

Review Comment:
   Does the list transaction mechanism only work when using the lineage 
feature? In general, how do we determine the target topic if the topic is only 
decided by the user provided serializer?



##########
flink-connector-kafka/src/test/resources/log4j2-test.properties:
##########
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = WARN

Review Comment:
   Please revert the logging changes before merging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to