I think a mix of async UPDATES and exactly-once all this might be tricky, and 
the typical use case for async IO is more about reads. So let’s take a step 
back: what would you like to achieve with this? Do you want a 
read-modify-update (e.g. a map function that queries and updates a DB) or just 
updates (like a sink based that goes against a DB). From the previous question, 
I assume the second case applies, in which case I wonder why you even need to 
be async for a sink? I think a much better approach could be based on Flink's 
TwoPhaseCommitSinkFunction, and maybe use some some batching to lower update 
costs.

On top of the TwoPhaseCommitSinkFunction, you could implement transactions 
against your DB, similar to e.g. this example with Postgres: 
http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/
 
<http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transaction-manager-that-works-with-postgresql/>
 .

Does this help or do you really need async read-modify-update?

Best,
Stefan

> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <luajit...@gmail.com>:
> 
> No, I mean how to implement exactly-once db commit (given our async io
> target is mysql), not the state used by flink.
> As mentioned in previous mail, if I commit db in
> notifyCheckpointComplete, we have a risk to lost data (lost commit,
> and flink restart would not trigger notifyCheckpointComplete for the
> last checkpoint again).
> On the other hand, if I update and commit per record, the sql/stored
> procedure have to handle duplicate updates at failure restart.
> 
> So, when or where to commit so that we could get exactly-once db ingress.
> 
> 2018-01-03 21:57 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>> 
>> Hi,
>> 
>> 
>> Then how to implement exactly-once async io? That is, neither missing
>> data or duplicating data.
>> 
>> 
>> From the docs about async IO here
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
>> :
>> 
>> "Fault Tolerance Guarantees:
>> The asynchronous I/O operator offers full exactly-once fault tolerance
>> guarantees. It stores the records for in-flight asynchronous requests in
>> checkpoints and restores/re-triggers the requests when recovering from a
>> failure.“
>> 
>> So it is already handled by Flink in a way that supports exactly-once.
>> 
>> Is there some way to index data by checkpoint id and records which
>> checkpoints already commit to db? But that means we need MapState,
>> right?
>> 
>> 
>> The information required depends a bit on the store that you are using,
>> maybe the last confirmed checkpoint id is enough, but maybe you require
>> something more. This transaction information is probably not „by-key“, but
>> „per-operator“, so I would suggest to use operator state (see next answer).
>> Btw the implementation of async operators does something very similar to
>> restore pending requests, and you can see the code in „AsyncWaitOperator".
>> 
>> 
>> However, the async-io operator normally follows other operators, e.g.
>> fold, so it normally faces the DataStream but not KeyedStream, and
>> DataStream only supports ListState, right?
>> 
>> 
>> You can use non-keyed state, aka operator state, to store such information.
>> See here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#using-managed-operator-state
>> . It does not require a KeyedSteam.
>> 
>> Best,
>> Stefan
>> 
>> 
>> 
>> 2018-01-03 18:43 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>> 
>> 
>> 
>> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <luajit...@gmail.com>:
>> 
>> 2017-12-08 18:25 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>:
>> 
>> You need to be a bit careful if your sink needs exactly-once semantics. In
>> this case things should either be idempotent or the db must support rolling
>> back changes between checkpoints, e.g. via transactions. Commits should be
>> triggered for confirmed checkpoints („notifyCheckpointComplete“).
>> 
>> 
>> I doubt if we have a risk here: in notifyCheckpointComplete, the
>> checkpoint was completed, and if the process crashes (or machine
>> failure) before it commits the db, the flink would restart the app,
>> restoring the state from the last checkpoint, but it would not invoke
>> notifyCheckpointComplete again? correct? if so, we would miss the
>> database ingress for the data between the last two checkpoints, am I
>> correct?
>> 
>> 
>> Yes, that is correct. What I was talking about was more the opposite
>> problem,i.e. committing too early. In that case, you could have committed
>> for a checkpoint that failed afterwards, and recovery will start from an
>> earlier checkpoint but with your commit already applied. You should only
>> commit after you received the notification or else your semantics can be
>> down to „at-least-once".
>> 
>> 

Reply via email to