Hi Yun,

thanks for the answer. I did now increased checkpoint interval, but still I
don't understand reason for creating producer and re-connecting to to kafka
broker each time. According to documentation:

Note: Semantic.EXACTLY_ONCE mode uses a fixed size pool of KafkaProducers
per each FlinkKafkaProducer011 instance. One of each of those producers is
used per one checkpoint. If the number of concurrent checkpoints exceeds
the pool size, FlinkKafkaProducer011 will throw an exception and will fail
the whole application. Please configure max pool size and max number of
concurrent checkpoints accordingly.

I assumed that this is also true for post 011 producers as well. I expected
to have 5 (default) producers created and used without re-instantiating
producer each time. In my case checkpoint is so fast that I will never have
concurrent checkpoints.

Regards,
Maxim.


On Wed, Apr 8, 2020 at 4:52 AM Yun Tang <myas...@live.com> wrote:

> Hi Maxim
>
> If you use the EXACTLY_ONCE semantic (instead of AT_LEAST_ONCE or NONE)
> for flink kafka producer. It will create new producer when every new
> checkpoint comes [1]. This is by design and from my point of view, the
> checkpoint interval of 10 seconds might be a bit too often. In general I
> think interval of 3 minutes should be enough. If you cannot offer the
> source rewind time after failover, you could turn the interval more often.
>
>
> [1]
> https://github.com/apache/flink/blob/980e31dcc29ec6cc60ed59569f1f1cb7c47747b7/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L871
>
> Best
> Yun Tang
> ------------------------------
> *From:* Maxim Parkachov <lazy.gop...@gmail.com>
> *Sent:* Monday, April 6, 2020 23:16
> *To:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* New kafka producer on each checkpoint
>
> Hi everyone,
>
> I'm trying to test exactly once functionality with my job under production
> load. The job is reading from kafka, using kafka timestamp as event time,
> aggregates every minute and outputs to other kafka topic. I use checkpoint
> interval 10 seconds.
>
> Everything seems to be working fine, but when I look to the log on INFO
> level, I see that with each checkpoint, new kafka producer is created and
> then closed again.
>
> 1. Is this how it is supposed to work ?
> 2. Is checkpoint interval 10 second too often ?
>
> Thanks,
> Maxim.
>

Reply via email to