[ https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860823#comment-17860823 ]
Martijn Visser commented on FLINK-34554: ---------------------------------------- [~loserwang1024] I think that would require a discussion on the Dev mailing list and potentially a FLIP, it would be great if you could start such a discussion thread > Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created > transactionalId per checkpoint > -------------------------------------------------------------------------------------------------------- > > Key: FLINK-34554 > URL: https://issues.apache.org/jira/browse/FLINK-34554 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.16.3, 1.17.2, 1.18.1 > Reporter: Hilmi Al Fatih > Priority: Major > Attachments: image (4).png, image (5).png, > image-2024-03-16-17-17-16-152.png > > > Flink version: 1.17.1 > Kafka Broker version: 2.7.1 * 4 GB heap memory for each > Hi, We recently had an outage in our production system after we perform a > Flink kafka-connector API upgrade. To give a brief context, our application > is a simple kafka-to-kafka pipeline with minimal processing. We run in > EXACTLY_ONCE mode, thus kafka transaction is involved. > Our application runs with total around 350 sink subtask. Checkpoint period > was set to 5 seconds to avoid blocking {{read_committed}} consumers too long. > We recently performed an upgrade with the following details: > Previous state: > * Flink version: 1.14.4 > * Broker version: 2.7.1 > * kafka connector API: FlinkKafkaProducer > Update to: > * Flink version: 1.17.1 > * Broker version: 2.7.1 > * kafka connector API: KafkaSink > Around 10 hours after the deployment, our kafka broker started to failing > with OOM error. Heap dump entries are dominated by the ProducerStateEntry > records. > Our investigation leads to finding the total implementation change between > FlinkKafkaProducer and KafkaSink. > * KafkaSink generate different transactionalId for each checkpoint, > * FlinkKafkaProducer uses constant set of transactionalId pool. > With this behavior, KafkaSink seemed to exhaust our broker heap very fast and > the ProducerStateEntry will only expire after > [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] , > which by default is set to 7 days. > ([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677], > > [ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268], > > [ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207]) > For our job, it means it creates roughly: > * 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~ > 2,520,000 > * 7 days) ~ 42mil entries. > Attached below is the number of ProducerStateEntry entries of heap dump when > it is OOM: > * 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries. > There are several things that come up in our mind to mitigate the drawbacks > such as: > * reduce the number of subtasks, so it reduces the number of transactionalId > * Enlarge the checkpoint period to reduce the newly generated > transactionalId rate. > * Shorten > [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to > expire the unused transactionalId soon. > * Increase the broker heap > However, above mitigation might be too cumbersome and need careful tuning > which harm our flexibility.In addition, due to the lack of maintaining > lingering transaction state, TransactionAborter seems to abort old > transaction naively. We might be accidentally (or purposefully) reuse the > same transactionalIdPrefix and start the counter from 0. In that case, if the > old transactionalId happens to have epoch >0, it will keep looping aborting > the nonexistent transactions up to the latest checkpoint counter (which may > be too big) and make the job stuck. > Btw, I am aware that in Flink 2.0, you guys are putting a lot of effort on > creating better integration with Kafka transaction > ([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]). > In FLIP-319, it mentions something about TID pooling. However, it is seem > that there is no relevant page yet for it, so I wonder whether there are any > concrete plan already that I can follow, or if there is something I can > contribute to, I will be really happy to help. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)