fapaul commented on code in PR #154: URL: https://github.com/apache/flink-connector-kafka/pull/154#discussion_r1979035891
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriterStateSerializer.java: ########## @@ -47,8 +54,15 @@ public byte[] serialize(KafkaWriterState state) throws IOException { public KafkaWriterState deserialize(int version, byte[] serialized) throws IOException { try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized); final DataInputStream in = new DataInputStream(bais)) { - final String transactionalIdPrefx = in.readUTF(); - return new KafkaWriterState(transactionalIdPrefx); + final String transactionalIdPrefix = in.readUTF(); + final Collection<CheckpointTransaction> ongoingTransactions = new ArrayList<>(); + if (version >= 2) { Review Comment: Nit: Isn't it better to assert on version 2 and fail for other versions? ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionNamingStrategyImpl.java: ########## @@ -53,6 +55,20 @@ public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer( } return context.getProducer(context.buildTransactionalId(expectedCheckpointId)); } + }, + POOLING { + @Override + public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer( + Context context) { + Collection<String> usedTransactionalIds = context.getOngoingTransactions(); + for (int offset = 0; ; offset++) { + String transactionalIdCandidate = context.buildTransactionalId(offset); + if (usedTransactionalIds.contains(transactionalIdCandidate)) { + continue; + } + return context.getProducer(transactionalIdCandidate); Review Comment: Do we have a log line somewhere that indicates how big the pool is? Since it might be an indicator of an serious issues we should probably log on every new producer creation. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkOptions.java: ########## @@ -72,7 +72,15 @@ public enum TransactionNamingStrategy { * <p>This is exactly the same behavior as in flink-connector-kafka 3.X. */ INCREMENTING( - TransactionNamingStrategyImpl.INCREMENTING, TransactionAbortStrategyImpl.PROBING); + TransactionNamingStrategyImpl.INCREMENTING, TransactionAbortStrategyImpl.PROBING), + /** + * This strategy reuses transaction names. It is more resource-friendly than {@link + * #INCREMENTING} on the Kafka broker. + * + * <p>It's a new strategy introduced in flink-connector-kafka 4.X. It requires Kafka 3.0+ + * and additional permissions. Review Comment: Can we list the additional operations e.g. calls to the admin client. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/TransactionAbortStrategyImpl.java: ########## @@ -107,6 +119,64 @@ private int abortTransactionOfSubtask(String prefix, int subtaskId, Context cont } return numTransactionAborted; } + }, + LISTING { + private final EnumSet<TransactionState> abortableStates = + EnumSet.complementOf( + EnumSet.of( + TransactionState.COMPLETE_ABORT, + TransactionState.COMPLETE_COMMIT, + TransactionState.PREPARE_COMMIT)); + + @Override + public void abortTransactions(Context context) { + + Collection<TransactionListing> openTransactionsForTopics = + AdminUtils.getOpenTransactionsForTopics( + context.getAdminClient(), context.getTopicNames()); + + // This list could be huge if TransactionNamingStrategy#INCREMENTING is used, so cap it + LOG.trace( Review Comment: Let's warn the user how many transactions will be aborted to spot unwanted behavior earlier ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java: ########## @@ -264,11 +282,51 @@ private TransactionAbortStrategyContextImpl getTransactionAbortStrategyContext( producerPool.recycle(producer); return epoch; }; + Set<String> ongoingTransactionIds = + recoveredStates.stream() + .flatMap( + s -> + s.getOngoingTransactions().stream() + .map(CheckpointTransaction::getTransactionalId)) + .collect(Collectors.toSet()); return new TransactionAbortStrategyContextImpl( + this::getTopicNames, kafkaSinkContext.getParallelInstanceId(), kafkaSinkContext.getNumberOfParallelInstances(), prefixesToAbort, startCheckpointId, - aborter); + aborter, + this::getAdminClient, + ongoingTransactionIds); + } + + private Collection<String> getTopicNames() { + KafkaDatasetIdentifier identifier = + getDatasetIdentifier() + .orElseThrow( + () -> + new IllegalStateException( + "The record serializer does not expose a static list of target topics.")); + if (identifier.getTopics() != null) { + return identifier.getTopics(); + } + return AdminUtils.getTopicsByPattern(getAdminClient(), identifier.getTopicPattern()); + } Review Comment: Does the list transaction mechanism only work when using the lineage feature? In general, how do we determine the target topic if the topic is only decided by the user provided serializer? ########## flink-connector-kafka/src/test/resources/log4j2-test.properties: ########## @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level = OFF +rootLogger.level = WARN Review Comment: Please revert the logging changes before merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org