Thank you very much for your answer.

I was able to reduce the number of sinks as you described. That helped a
lot, thank you.

I think you must be right with regards to (2) - opening a new transaction
being the culprit. It's unlikely to be (1) since this behaviour occurs even
when there are 0 messages going through a brand new, locally running kafka
cluster.

Kind regards,
Fil

On Tue, 29 Mar 2022 at 09:34, Arvid Heise <ar...@apache.org> wrote:

> Hi Filip,
>
> two things will impact sync time for Kafka:
> 1. Flushing all old data [1], in particular flushing all in-flight
> partitions [2]. However, that shouldn't cause a stacking effect except when
> the brokers are overloaded on checkpoint.
> 2. Opening a new transaction [3]. Since all transactions are linearized on
> the Kafka brokers, this is the most likely root cause. Note that aborted
> checkpoints may require multiple transactions to be opened. So you could
> check if you have them quite often aborted.
>
> If you want to know more, I suggest you attach a profiler and find the
> specific culprit and report back [4]. There is a low probability that the
> sink framework has a bug that causes this behavior. In that case, we can
> fix it more easily than if it's a fundamental issue with Kafka. In general,
> exactly-once and low latency are somewhat contradicting requirements, so
> there is only so much you can do.
>
> Not knowing your topology but maybe you can reduce the number of sinks?
> With the KafkaRecordSerializationSchema you can set different topics for
> different ProducerRecords of the same DataStream.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L190-L190
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java#L177-L183
> [3]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L302-L321
> [4]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/debugging/application_profiling/
>
>
> On Sat, Mar 26, 2022 at 2:11 PM Filip Karnicki <filip.karni...@gmail.com>
> wrote:
>
>> Hi, I noticed that with each added (kafka) sink with exactly-once
>> guarantees, there looks to be a penalty of ~100ms in terms of sync
>> checkpointing time.
>>
>> Would anyone be able to explain and/or point me in the right direction in
>> the source code so that I could understand why that is? Specifically, why
>> there appears to be a 100ms added for _each_ sink, and not a flat 100ms for
>> all sinks, potentially pointing to a sequential set of IO calls (wiiiild
>> guess)
>>
>> I would be keen to understand if there's anything I could do (incl.
>> contributing code) that would parallelise this penalty in terms of sync
>> checkpointing time.
>>
>> Alternatively, is there any setting that would help me bring the sync
>> checkpointing time down (and still get exactly-once guarantees)?
>>
>> Many thanks,
>> Fil
>>
>

Reply via email to