Hi Nick,

I'm not entirely sure that I understand your setup correctly.

Basically, when enabling exactly once and checkpointing, Flink will only
consume messages that have been committed.
If you chain two Flink jobs with an intermediate Kafka topic, then the
first Flink job will only commit messages on checkpoints and thus the
second Flink job will only read these messages with a delay up to the
checkpoint interval.

Now if your input record is created with a different tool, make sure that
you commit it immediately. Then, Flink should immediately also process that
record. However, note that Flink again writes the record in a transaction.
Thus, if your tests involve you checking for the output, you would need to
configure your reader to read uncommitted data [1].

You can decrease the latency by decreasing the checkpointing interval. If
you have a need for very low latency, you might also check if you really
need exactly once (that's typically not necessary).

[1] https://kafka.apache.org/documentation/#consumerconfigs_isolation.level

On Mon, Dec 28, 2020 at 3:07 AM Danny Chan <danny0...@apache.org> wrote:

> Hi, Nick ~
> The behavior is as expected, because Kafka source/sink relies on the
> Checkpoints to complement the exactly-once write semantics, a checkpoint
> snapshot the states on a time point which is used for recovering, the
> current internals for Kafka sink is that it writes to Kafka but only
> commits it when a checkpoint completes.
>
> For your needs, i guess you want a more near-real-time write but still
> keep the exactly once semantics, i'm sorry to tell that there is no other 
> infrastructure
> that we can use for exactly-once semantics except for the checkpoints.
>
> nick toker <nick.toker....@gmail.com> 于2020年12月27日周日 下午3:12写道:
>
>> Hi
>>
>> any idea?
>> is it a bug?
>>
>>
>> regards'
>> nick
>>
>> ‫בתאריך יום ד׳, 23 בדצמ׳ 2020 ב-11:10 מאת ‪nick toker‬‏ <‪
>> nick.toker....@gmail.com‬‏>:‬
>>
>>> Hello
>>>
>>> We noticed the following behavior:
>>> If we enable the flink checkpoints, we saw that there is a delay between
>>> the time we write a message to the KAFKA topic and the time the flink kafka
>>> connector consumes this message.
>>> The delay is closely related to checkpointInterval and/or
>>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>>> message from KAFKA will be one of these parameters
>>>
>>> If we disable the checkpoints, the message is immediately consumed
>>> We work with the EXACTLY_ONCE semantic
>>> Please note that we inject only one message
>>>
>>> Could you please advise how we can remove/control this delay?
>>>
>>> Please see the attached code of AbstractFetcher and KafkaFetcher (as a
>>> png file)
>>> (For example emitRecordsWithTimestamps() use a lock on checkpointLock).
>>> Could this explain the behaviour ?
>>>
>>>
>>> BR
>>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to