+ user@f.a.o  (adding the conversation back to the user mailing list)

On Fri, Mar 12, 2021 at 6:06 AM Kevin Kwon <fsw0...@gmail.com> wrote:

> Thanks Tzu-Li
>
> Interesting algorithm. Is consumer offset also committed to Kafka at the
> last COMMIT stage after the checkpoint has completed?
>

Flink does commit the offsets back to Kafka when sources perform
checkpoints, but those offsets are not used for fault-tolerance and restore
by Flink. They are purely used as a means for exposing consumption progress.
Flink only respects the offsets being written to its checkpoints. Those
offsets are essentially the state of the FlinkKafkaConsumer sources, and
are written to checkpoints by the sources.
As previously explained, the last COMMIT stage comes after that, i.e. after
all Flink operators complete their state checkpoint.


>
> Also does the coordinator (JM) write any data in write-ahead-log before
> sending out commit messages to all Flink entities? I'm concerned when JM
> succeeds sending a commit message to some entities but fails to others and
> dies.
>

No. And indeed, while Flink guarantees that checkpoint complete
notifications will be eventually received by all listening operators (e.g.
the Kafka sinks), the job can ideed fail when only partially some sinks
have received the notification (and commits).
The way Flink handles the issue you mentioned, is that all pending-commit
transaction ids will be part of the sink's state.
When a sink checkpoints its state (during the pre-commit phase), it writes
all pending-commit transaction ids. If for any reason the job fails and
failover is triggered, the restored lastest complete checkpoint will
contain those pending-commit transaction ids.
Then, those pending transactions will be attempted to be committed.
So, in the end, you can see this as the transactions will all eventually be
successfully committed, even in the event of a failure.


>
> Finally, seems 2PC is implemented in order to make 3 entities, Kafka
> producer data / Kafka consumer offset / Flink Checkpoint to be in
> consistent state. However, since checkpoint is an ever increasing state
> like ledger that prunes the previous state as it goes, isn't
> write-ahead-log in the sink side enough to handle the exactly-once
> processing guarantee? what I mean is checking the state between WHL and the
> current checkpoint status and conservatively rollback to previous
> checkpoint and replay all data
>
> On Thu, Mar 11, 2021 at 7:44 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Kevin,
>>
>> Perhaps the easiest way to answer your question, is to go through how the
>> exactly-once FlinkKafkaProducer using a 2PC implementation on top of
>> Flink's checkpointing mechanism.
>>
>> The phases can be broken down as follows (simplified assuming max 1
>> concurrent checkpoint and that checkpoint completion notifications are
>> never late):
>>
>>    1. BEGIN_TXN: In between each Flink checkpoint, each
>>    FlinkKafkaProducer sink operator creates a new Kafka transaction. You can
>>    assume that on startup, a new Kafka transaction is created immediately for
>>    records that occur before the first checkpoint.
>>    2. PRE_COMMIT: Once a FlinkKafkaProducer sink operator receives
>>    Flink's checkpoint barrier, it flushes pending records to the current open
>>    transaction, and opens a new one for future records, which belongs to the
>>    next checkpoint and thus should be written to the next transaction. Once
>>    flushed, the sink operator acknowledges it has completed its checkpoint.
>>    3. COMMIT: Once all sinks acknowledge checkpoint completion, the
>>    Flink checkpoint is considered complete (containing state of all operators
>>    + consumer offsets). Once that happens, Flink notifies each sink operator
>>    of the completion, and only upon receiving this notification, can the sink
>>    operator commit the previous transaction.
>>
>> There are some edge cases that is handled, e.g. a checkpoint is
>> considered complete, but before all sinks receive the completion
>> notification and commit their transactions, the job fails (that's why txn
>> ids are written into the checkpoint as well, to make sure all txns
>> belonging to that checkpoint is still eventually committed after restore).
>>
>> The general takeaway is that each parallel sink operator can commit the
>> Kafka transactions only after all participants in the 2PC (i.e. all Flink
>> operators and sinks) acknowledge that they are ready to commit.
>> In Flink terms, the JM is the coordinator, and an operator / sink
>> completing their checkpoint is acknowledging that they are ready for
>> committing.
>>
>> From an end-to-end point of view, downstream consumers of the output
>> Kafka topic will not see records (assuming they are consuming in Kafka's
>> read.commited mode) until the upstream Flink application sink commits the
>> open Kafka transactions.
>> This boils down to, the read latency for downstream applications is at
>> least the upstream Flink app's checkpoint interval.
>>
>> Hope this helps!
>>
>> Cheers,
>> Gordon
>>
>> On Wed, Mar 10, 2021 at 5:20 PM Kevin Kwon <fsw0...@gmail.com> wrote:
>>
>>> Hi team, I just have a bit of confusion where Two Phase Commit and
>>> Kafka's transaction aware producer using transaction.id and
>>> enable.autocommit plays together
>>>
>>> what I understand of Flink checkpoint (correct me if I'm wrong) is that
>>> it saves the transaction ID as well as the consumer's commit offsets, so
>>> when application fails and restarts, it will reprocess everything from the
>>> last checkpoint and data will be idempotently processed in the Kafka side.
>>> (exactly-once processing rather than exactly-once delivery)
>>>
>>> the question is where does 2 phase commit play a role here?
>>>
>>

Reply via email to