Hi Stefan, Your reply really helps me a lot. Thank you.
2018-01-08 19:38 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>: > Hi, > > 1. If `snapshotState` failed at the first checkpoint, does it mean there > is no state and no transaction can be aborted by default? > > > This is a general problem and not only limited to the first checkpoint. > Whenever you open a transaction, there is no guaranteed way to store it in > persistent state to abort it in case of failure. In theory, your job can > crash at any point after you just opened a transaction. So in the end I > guess you must rely on something like e.g. timeout based mechanism. You can > do some _best effort_ to proactively cancel uncommitted transactions > through methods like states, listing them in files, or having a fixed pool > of transaction ids and iterate them all for cancellation on a restart. > > 2. I saw FlinkKafkaProducer011 has a transaction id pool, which has > multiple ids to be reused by producer, and it aborts all ids in this pool > in the `initializeState`. Is this pool designed for the situation in the > first problem or something I haven't noticed? > > > This implementation is very specific for KafkaProducer and is not > necessarily a good blueprint for what you are planning. In particular, in > this case there is a fixed and limited universe of all potential > transaction ids that a task can potentially (re)use, so after a restart > without state we can simply iterate all possible transaction ids and issue > a cancel for all of them. In general, you don’t always know all possible > transaction ids in a way that allows you to opportunistically cancel all > potential orphaned transactions. > > 2018-01-04 22:15 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com>: > >> Yes, that is how it works. >> >> > Am 04.01.2018 um 14:47 schrieb Jinhua Luo <luajit...@gmail.com>: >> > >> > The TwoPhaseCommitSinkFunction seems to record the transaction status >> > in the state just like what I imagine above, correct? >> > and if the progress fails before commit, in the later restart, the >> > commit would be triggered again, correct? So the commit would not be >> > forgotten, correct? >> > >> > 2018-01-03 22:54 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com >> >: >> >> I think a mix of async UPDATES and exactly-once all this might be >> tricky, >> >> and the typical use case for async IO is more about reads. So let’s >> take a >> >> step back: what would you like to achieve with this? Do you want a >> >> read-modify-update (e.g. a map function that queries and updates a DB) >> or >> >> just updates (like a sink based that goes against a DB). From the >> previous >> >> question, I assume the second case applies, in which case I wonder why >> you >> >> even need to be async for a sink? I think a much better approach could >> be >> >> based on Flink's TwoPhaseCommitSinkFunction, and maybe use some some >> >> batching to lower update costs. >> >> >> >> On top of the TwoPhaseCommitSinkFunction, you could implement >> transactions >> >> against your DB, similar to e.g. this example with Postgres: >> >> http://hlinnaka.iki.fi/2013/04/11/how-to-write-a-java-transa >> ction-manager-that-works-with-postgresql/ >> >> . >> >> >> >> Does this help or do you really need async read-modify-update? >> >> >> >> Best, >> >> Stefan >> >> >> >> >> >> Am 03.01.2018 um 15:08 schrieb Jinhua Luo <luajit...@gmail.com>: >> >> >> >> No, I mean how to implement exactly-once db commit (given our async io >> >> target is mysql), not the state used by flink. >> >> As mentioned in previous mail, if I commit db in >> >> notifyCheckpointComplete, we have a risk to lost data (lost commit, >> >> and flink restart would not trigger notifyCheckpointComplete for the >> >> last checkpoint again). >> >> On the other hand, if I update and commit per record, the sql/stored >> >> procedure have to handle duplicate updates at failure restart. >> >> >> >> So, when or where to commit so that we could get exactly-once db >> ingress. >> >> >> >> 2018-01-03 21:57 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com >> >: >> >> >> >> >> >> Hi, >> >> >> >> >> >> Then how to implement exactly-once async io? That is, neither missing >> >> data or duplicating data. >> >> >> >> >> >> From the docs about async IO here >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >> dev/stream/asyncio.html >> >> : >> >> >> >> "Fault Tolerance Guarantees: >> >> The asynchronous I/O operator offers full exactly-once fault tolerance >> >> guarantees. It stores the records for in-flight asynchronous requests >> in >> >> checkpoints and restores/re-triggers the requests when recovering from >> a >> >> failure.“ >> >> >> >> So it is already handled by Flink in a way that supports exactly-once. >> >> >> >> Is there some way to index data by checkpoint id and records which >> >> checkpoints already commit to db? But that means we need MapState, >> >> right? >> >> >> >> >> >> The information required depends a bit on the store that you are using, >> >> maybe the last confirmed checkpoint id is enough, but maybe you require >> >> something more. This transaction information is probably not „by-key“, >> but >> >> „per-operator“, so I would suggest to use operator state (see next >> answer). >> >> Btw the implementation of async operators does something very similar >> to >> >> restore pending requests, and you can see the code in >> „AsyncWaitOperator". >> >> >> >> >> >> However, the async-io operator normally follows other operators, e.g. >> >> fold, so it normally faces the DataStream but not KeyedStream, and >> >> DataStream only supports ListState, right? >> >> >> >> >> >> You can use non-keyed state, aka operator state, to store such >> information. >> >> See here: >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/ >> dev/stream/state.html#using-managed-operator-state >> >> . It does not require a KeyedSteam. >> >> >> >> Best, >> >> Stefan >> >> >> >> >> >> >> >> 2018-01-03 18:43 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com >> >: >> >> >> >> >> >> >> >> Am 01.01.2018 um 15:22 schrieb Jinhua Luo <luajit...@gmail.com>: >> >> >> >> 2017-12-08 18:25 GMT+08:00 Stefan Richter <s.rich...@data-artisans.com >> >: >> >> >> >> You need to be a bit careful if your sink needs exactly-once >> semantics. In >> >> this case things should either be idempotent or the db must support >> rolling >> >> back changes between checkpoints, e.g. via transactions. Commits >> should be >> >> triggered for confirmed checkpoints („notifyCheckpointComplete“). >> >> >> >> >> >> I doubt if we have a risk here: in notifyCheckpointComplete, the >> >> checkpoint was completed, and if the process crashes (or machine >> >> failure) before it commits the db, the flink would restart the app, >> >> restoring the state from the last checkpoint, but it would not invoke >> >> notifyCheckpointComplete again? correct? if so, we would miss the >> >> database ingress for the data between the last two checkpoints, am I >> >> correct? >> >> >> >> >> >> Yes, that is correct. What I was talking about was more the opposite >> >> problem,i.e. committing too early. In that case, you could have >> committed >> >> for a checkpoint that failed afterwards, and recovery will start from >> an >> >> earlier checkpoint but with your commit already applied. You should >> only >> >> commit after you received the notification or else your semantics can >> be >> >> down to „at-least-once". >> >> >> >> >> >> >> >> > >