Yes, that is how it works.

> Am 04.01.2018 um 14:47 schrieb Jinhua Luo <luajit...@gmail.com>:
> 
> The TwoPhaseCommitSinkFunction seems to record the transaction status
> in the state just like what I imagine above, correct?
> and if the progress fails before commit, in the later restart, the
> commit would be triggered again, correct? So the commit would not be
> forgotten, correct?
> 
> 2018-01-03 22:54 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>> I think a mix of async UPDATES and exactly-once all this might be tricky,
>> and the typical use case for async IO is more about reads. So let’s take a
>> step back: what would you like to achieve with this? Do you want a
>> read-modify-update (e.g. a map function that queries and updates a DB) or
>> just updates (like a sink based that goes against a DB). From the previous
>> question, I assume the second case applies, in which case I wonder why you
>> even need to be async for a sink? I think a much better approach could be
>> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some
>> batching to lower update costs.
>> 
>> On top of the TwoPhaseCommitSinkFunction, you could implement transactions
>> against your DB, similar to e.g. this example with Postgres:
>> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/
>> .
>> 
>> Does this help or do you really need async read-modify-update?
>> 
>> Best,
>> Stefan
>> 
>> 
>> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <luajit...@gmail.com>:
>> 
>> No, I mean how to implement exactly-once db commit (given our async io
>> target is mysql), not the state used by flink.
>> As mentioned in previous mail, if I commit db in
>> notifyCheckpointComplete, we have a risk to lost data (lost commit,
>> and flink restart would not trigger notifyCheckpointComplete for the
>> last checkpoint again).
>> On the other hand, if I update and commit per record, the sql/stored
>> procedure have to handle duplicate updates at failure restart.
>> 
>> So, when or where to commit so that we could get exactly-once db ingress.
>> 
>> 2018-01-03 21:57 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>> 
>> 
>> Hi,
>> 
>> 
>> Then how to implement exactly-once async io? That is, neither missing
>> data or duplicating data.
>> 
>> 
>> From the docs about async IO here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>> :
>> 
>> "Fault Tolerance Guarantees:
>> The asynchronous I/O operator offers full exactly-once fault tolerance
>> guarantees. It stores the records for in-flight asynchronous requests in
>> checkpoints and restores/re-triggers the requests when recovering from a
>> failure.“
>> 
>> So it is already handled by Flink in a way that supports exactly-once.
>> 
>> Is there some way to index data by checkpoint id and records which
>> checkpoints already commit to db? But that means we need MapState,
>> right?
>> 
>> 
>> The information required depends a bit on the store that you are using,
>> maybe the last confirmed checkpoint id is enough, but maybe you require
>> something more. This transaction information is probably not „by-key“, but
>> „per-operator“, so I would suggest to use operator state (see next answer).
>> Btw the implementation of async operators does something very similar to
>> restore pending requests, and you can see the code in „AsyncWaitOperator".
>> 
>> 
>> However, the async-io operator normally follows other operators, e.g.
>> fold, so it normally faces the DataStream but not KeyedStream, and
>> DataStream only supports ListState, right?
>> 
>> 
>> You can use non-keyed state, aka operator state, to store such information.
>> See here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
>> . It does not require a KeyedSteam.
>> 
>> Best,
>> Stefan
>> 
>> 
>> 
>> 2018-01-03 18:43 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>> 
>> 
>> 
>> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <luajit...@gmail.com>:
>> 
>> 2017-12-08 18:25 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>> 
>> You need to be a bit careful if your sink needs exactly-once semantics. In
>> this case things should either be idempotent or the db must support rolling
>> back changes between checkpoints, e.g. via transactions. Commits should be
>> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>> 
>> 
>> I doubt if we have a risk here: in notifyCheckpointComplete, the
>> checkpoint was completed, and if the process crashes (or machine
>> failure) before it commits the db, the flink would restart the app,
>> restoring the state from the last checkpoint, but it would not invoke
>> notifyCheckpointComplete again? correct? if so, we would miss the
>> database ingress for the data between the last two checkpoints, am I
>> correct?
>> 
>> 
>> Yes, that is correct. What I was talking about was more the opposite
>> problem,i.e. committing too early. In that case, you could have committed
>> for a checkpoint that failed afterwards, and recovery will start from an
>> earlier checkpoint but with your commit already applied. You should only
>> commit after you received the notification or else your semantics can be
>> down to „at-least-once".
>> 
>> 
>> 

Reply via email to