fapaul commented on code in PR #152: URL: https://github.com/apache/flink-connector-kafka/pull/152#discussion_r1954591199
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Comparator; +import java.util.Deque; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Properties; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.apache.flink.util.IOUtils.closeAll; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Manages a pool of {@link FlinkKafkaInternalProducer} instances for reuse in the {@code + * KafkaWriter} and keeps track of the used transactional ids. + * + * <p>Reusing the producers is important for performance reasons. The producer initialization + * includes a few requests to the broker (e.g., ApiVersion), which can be avoided with reuse. + * + * <p>Tracking the transactional ids in use can be a tricky because the {@code KafkaCommitter} is + * ultimately finishing the transactions. There are two major cases: + * <li>The committer is chained to the writer (common case): The {@code KafkaCommittable} contains + * the producer (in-memory transfer) and the producer is only returned to the producer pool upon + * completion by the committer. Thus, none of the producers in the pool have active + * transactions. + * <li>The committer is not chained: The {@code KafkaCommittableSerializer} will return the producer + * to this pool, but it still has an ongoing transaction. The producer will be "cloned" in the + * committer by using producer id and epoch. In this case, we rely on {@link + * org.apache.kafka.common.errors.ProducerFencedException} to test later if a producer in the + * pool is still in the transaction or not. + */ +public class ProducerPoolImpl implements ProducerPool { + private static final Logger LOG = LoggerFactory.getLogger(ProducerPoolImpl.class); + + /** + * The configuration for the Kafka producer. This is used to create new producers when the pool + * is empty. + */ + private final Properties kafkaProducerConfig; + /** Callback to allow the writer to init metrics. */ + private final Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> producerInit; + /** + * The pool of producers that are available for reuse. This pool is used to avoid creating new + * producers for every transaction. + */ + private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> producerPool = + new ArrayDeque<>(); + /** + * The map of ongoing transactions (id -> producer/CheckpointTransaction). This is used to keep + * track of the transactions that are ongoing and the respective producers are not in the pool. + */ + private final Map<String, ProducerEntry> producerByTransactionalId = new TreeMap<>(); Review Comment: For my undestanding: this map only holds the writer's "brain" and is not persisted. So it only holds producers that the writer has used since the last failover. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Comparator; +import java.util.Deque; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Properties; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.apache.flink.util.IOUtils.closeAll; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Manages a pool of {@link FlinkKafkaInternalProducer} instances for reuse in the {@code + * KafkaWriter} and keeps track of the used transactional ids. + * + * <p>Reusing the producers is important for performance reasons. The producer initialization + * includes a few requests to the broker (e.g., ApiVersion), which can be avoided with reuse. + * + * <p>Tracking the transactional ids in use can be a tricky because the {@code KafkaCommitter} is + * ultimately finishing the transactions. There are two major cases: + * <li>The committer is chained to the writer (common case): The {@code KafkaCommittable} contains + * the producer (in-memory transfer) and the producer is only returned to the producer pool upon + * completion by the committer. Thus, none of the producers in the pool have active + * transactions. + * <li>The committer is not chained: The {@code KafkaCommittableSerializer} will return the producer + * to this pool, but it still has an ongoing transaction. The producer will be "cloned" in the + * committer by using producer id and epoch. In this case, we rely on {@link + * org.apache.kafka.common.errors.ProducerFencedException} to test later if a producer in the + * pool is still in the transaction or not. + */ +public class ProducerPoolImpl implements ProducerPool { + private static final Logger LOG = LoggerFactory.getLogger(ProducerPoolImpl.class); + + /** + * The configuration for the Kafka producer. This is used to create new producers when the pool + * is empty. + */ + private final Properties kafkaProducerConfig; + /** Callback to allow the writer to init metrics. */ + private final Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> producerInit; + /** + * The pool of producers that are available for reuse. This pool is used to avoid creating new + * producers for every transaction. + */ + private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> producerPool = + new ArrayDeque<>(); + /** + * The map of ongoing transactions (id -> producer/CheckpointTransaction). This is used to keep + * track of the transactions that are ongoing and the respective producers are not in the pool. + */ + private final Map<String, ProducerEntry> producerByTransactionalId = new TreeMap<>(); + /** + * A secondary tracking structure to quickly find transactions coming from an earlier + * checkpoints. + */ + private final NavigableMap<CheckpointTransaction, String> transactionalIdsByCheckpoint = + new TreeMap<>(Comparator.comparing(CheckpointTransaction::getCheckpointId)); + + /** Creates a new {@link ProducerPoolImpl}. */ + public ProducerPoolImpl( + Properties kafkaProducerConfig, + Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> producerInit) { + this.kafkaProducerConfig = + checkNotNull(kafkaProducerConfig, "kafkaProducerConfig must not be null"); + this.producerInit = checkNotNull(producerInit, "producerInit must not be null"); + } + + @Override + public void transactionFinished(String transactionalId) { + ProducerEntry producerEntry = producerByTransactionalId.remove(transactionalId); + LOG.debug("Transaction {} finished, producer {}", transactionalId, producerEntry); + if (producerEntry == null) { + // during recovery, the committer may finish transactions that are not yet ongoing from + // the writer's perspective + return; + } + + transactionalIdsByCheckpoint.remove(producerEntry.getCheckpointedTransaction()); + recycleProducer(producerEntry.getProducer()); + + // In rare cases (only for non-chained committer), some transactions may not be detected to + // be finished. + // For example, a transaction may be committed at the same time the writer state is + // snapshot. The writer contains the transaction as ongoing but the committer state will + // later not contain it. + // In these cases, we make use of the fact that committables are processed in order of the + // checkpoint id. + // That means a transaction state with checkpoint id C implies that all C' < C are finished. + NavigableMap<CheckpointTransaction, String> earlierTransactions = + transactionalIdsByCheckpoint.headMap( + producerEntry.getCheckpointedTransaction(), false); + if (!earlierTransactions.isEmpty()) { + for (String id : earlierTransactions.values()) { + ProducerEntry entry = producerByTransactionalId.remove(id); + recycleProducer(entry.getProducer()); + } + earlierTransactions.clear(); + } + } + + @Override + public void recycle(FlinkKafkaInternalProducer<byte[], byte[]> producer) { + recycleProducer(producer); + ProducerEntry producerEntry = + producerByTransactionalId.remove(producer.getTransactionalId()); + transactionalIdsByCheckpoint.remove(producerEntry.getCheckpointedTransaction()); + } + + private void recycleProducer(@Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer) { + // In case of recovery, we don't create a producer for the ongoing transactions. + // The producer is just initialized on committer side. + if (producer == null) { + return; + } + // For non-chained committer, we have a split brain scenario: + // Both the writer and the committer have a producer representing the same transaction. + // The committer producer has finished the transaction while the writer producer is still in + // transaction. In this case, we forcibly complete the transaction, such that we can + // initialize it. + if (producer.isInTransaction()) { + producer.transactionCompletedExternally(); + } + producerPool.add(producer); + + LOG.debug("Recycling {}, new pool size {}", producer, producerPool.size()); + } + + @Override + public FlinkKafkaInternalProducer<byte[], byte[]> getTransactionalProducer( + String transactionalId, long checkpointId) { + FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.poll(); + if (producer == null) { + producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId); + producerInit.accept(producer); + } else if (transactionalId != null) { + producer.setTransactionId(transactionalId); + } + if (transactionalId != null) { + // first keep track of the transaction and producer because initTransaction may be + // interrupted + CheckpointTransaction checkpointedTransaction = + new CheckpointTransaction(transactionalId, checkpointId); + ProducerEntry existing = + producerByTransactionalId.put( + transactionalId, new ProducerEntry(producer, checkpointedTransaction)); + transactionalIdsByCheckpoint.put(checkpointedTransaction, transactionalId); + checkState( + existing == null, + "Transaction %s already ongoing existing producer %s; new producer %s", + transactionalId, + existing, + producer); + producer.initTransactions(); + } + LOG.debug("getProducer {}, new pool size {}", producer, producerPool.size()); + return producer; + } + + @VisibleForTesting + public Collection<FlinkKafkaInternalProducer<byte[], byte[]>> getProducers() { + return producerPool; + } + + @Override + public void close() throws Exception { + LOG.debug( + "Closing used producers {} and free producers {}", + producerByTransactionalId, + producerPool); + closeAll( + () -> closeAll(producerPool), + () -> + closeAll( + producerByTransactionalId.values().stream() + .map(ProducerEntry::getProducer) + .collect(Collectors.toList())), + producerPool::clear, + producerByTransactionalId::clear); + } + + private static class ProducerEntry { + private final FlinkKafkaInternalProducer<byte[], byte[]> producer; + private final CheckpointTransaction checkpointedTransaction; + + private ProducerEntry( + FlinkKafkaInternalProducer<byte[], byte[]> producer, + CheckpointTransaction checkpointedTransaction) { + this.producer = checkNotNull(producer, "producer must not be null"); + this.checkpointedTransaction = + checkNotNull( + checkpointedTransaction, "checkpointedTransaction must not be null"); + } + + public CheckpointTransaction getCheckpointedTransaction() { + return checkpointedTransaction; + } + + public FlinkKafkaInternalProducer<byte[], byte[]> getProducer() { + return producer; + } + + @Override + public String toString() { + if (producer != null) { Review Comment: Nit: I think the producer cannot be null at this point. ########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPool.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +/** A pool of producers that can be recycled. */ +public interface ProducerPool extends AutoCloseable { + /** + * Notify the pool that a transaction has finished. The producer with the given transactional id + * can be recycled. + */ + void transactionFinished(String transactionalId); Review Comment: Rename to recycle -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org