AHeise commented on code in PR #152: URL: https://github.com/apache/flink-connector-kafka/pull/152#discussion_r1956126504
########## flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/ProducerPoolImpl.java: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink.internal; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Comparator; +import java.util.Deque; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Properties; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.apache.flink.util.IOUtils.closeAll; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Manages a pool of {@link FlinkKafkaInternalProducer} instances for reuse in the {@code + * KafkaWriter} and keeps track of the used transactional ids. + * + * <p>Reusing the producers is important for performance reasons. The producer initialization + * includes a few requests to the broker (e.g., ApiVersion), which can be avoided with reuse. + * + * <p>Tracking the transactional ids in use can be a tricky because the {@code KafkaCommitter} is + * ultimately finishing the transactions. There are two major cases: + * <li>The committer is chained to the writer (common case): The {@code KafkaCommittable} contains + * the producer (in-memory transfer) and the producer is only returned to the producer pool upon + * completion by the committer. Thus, none of the producers in the pool have active + * transactions. + * <li>The committer is not chained: The {@code KafkaCommittableSerializer} will return the producer + * to this pool, but it still has an ongoing transaction. The producer will be "cloned" in the + * committer by using producer id and epoch. In this case, we rely on {@link + * org.apache.kafka.common.errors.ProducerFencedException} to test later if a producer in the + * pool is still in the transaction or not. + */ +public class ProducerPoolImpl implements ProducerPool { + private static final Logger LOG = LoggerFactory.getLogger(ProducerPoolImpl.class); + + /** + * The configuration for the Kafka producer. This is used to create new producers when the pool + * is empty. + */ + private final Properties kafkaProducerConfig; + /** Callback to allow the writer to init metrics. */ + private final Consumer<FlinkKafkaInternalProducer<byte[], byte[]>> producerInit; + /** + * The pool of producers that are available for reuse. This pool is used to avoid creating new + * producers for every transaction. + */ + private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> producerPool = + new ArrayDeque<>(); + /** + * The map of ongoing transactions (id -> producer/CheckpointTransaction). This is used to keep + * track of the transactions that are ongoing and the respective producers are not in the pool. + */ + private final Map<String, ProducerEntry> producerByTransactionalId = new TreeMap<>(); Review Comment: Exactly. However, in the last PR, the state is actually persisted because for an id pool, we also need to know the ongoing transactions in the committer state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org