Hi Maxim,

That is a good question. I don't see an obvious reason that we cannot reuse
the producers. That said, there might be some corner cases where the
KafkaProducer is not reusable. For example, if the checkpoint interval is
long, the producer.id assigned by the TransactionCoordinator may have
expired on the broker side and the producer may not be reusable anymore.
But that should be a rare case.

@Piotr Nowojski <pi...@ververica.com> might know some more reasons that the
producers are not reused when it was initially implemented.

Thanks,

JIangjie (Becket) Qin

On Mon, Apr 13, 2020 at 4:59 PM Maxim Parkachov <lazy.gop...@gmail.com>
wrote:

> Hi Yun,
>
> thanks for the answer. I did now increased checkpoint interval, but still
> I don't understand reason for creating producer and re-connecting to to
> kafka broker each time. According to documentation:
>
> Note: Semantic.EXACTLY_ONCE mode uses a fixed size pool of KafkaProducers
> per each FlinkKafkaProducer011 instance. One of each of those producers
> is used per one checkpoint. If the number of concurrent checkpoints exceeds
> the pool size, FlinkKafkaProducer011 will throw an exception and will
> fail the whole application. Please configure max pool size and max number
> of concurrent checkpoints accordingly.
>
> I assumed that this is also true for post 011 producers as well. I
> expected to have 5 (default) producers created and used without
> re-instantiating producer each time. In my case checkpoint is so fast that
> I will never have concurrent checkpoints.
>
> Regards,
> Maxim.
>
>
> On Wed, Apr 8, 2020 at 4:52 AM Yun Tang <myas...@live.com> wrote:
>
>> Hi Maxim
>>
>> If you use the EXACTLY_ONCE semantic (instead of AT_LEAST_ONCE or NONE)
>> for flink kafka producer. It will create new producer when every new
>> checkpoint comes [1]. This is by design and from my point of view, the
>> checkpoint interval of 10 seconds might be a bit too often. In general I
>> think interval of 3 minutes should be enough. If you cannot offer the
>> source rewind time after failover, you could turn the interval more often.
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/980e31dcc29ec6cc60ed59569f1f1cb7c47747b7/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L871
>>
>> Best
>> Yun Tang
>> ------------------------------
>> *From:* Maxim Parkachov <lazy.gop...@gmail.com>
>> *Sent:* Monday, April 6, 2020 23:16
>> *To:* user@flink.apache.org <user@flink.apache.org>
>> *Subject:* New kafka producer on each checkpoint
>>
>> Hi everyone,
>>
>> I'm trying to test exactly once functionality with my job under
>> production load. The job is reading from kafka, using kafka timestamp as
>> event time, aggregates every minute and outputs to other kafka topic. I use
>> checkpoint interval 10 seconds.
>>
>> Everything seems to be working fine, but when I look to the log on INFO
>> level, I see that with each checkpoint, new kafka producer is created and
>> then closed again.
>>
>> 1. Is this how it is supposed to work ?
>> 2. Is checkpoint interval 10 second too often ?
>>
>> Thanks,
>> Maxim.
>>
>

Reply via email to