Thanks for the feedback and. glad I am on the right track. > Outstanding transactions should be automatically aborted on restart by Flink.
Let me understand this 1. Flink pipe is cancelled and has dangling kafka transactions. 2. A new Flink pipe ( not restored from a checkpoint or sp ) is started which is essentially the same pipe as 1 but does not restore. Would the dangling kafka transactions be aborted ? If yes, how does it work? As in how does the new pipe. know which transactions to abort ? Does it ask kafka for pending transactions and know which one belongs to the first pipe ( maybe b'coz they share some id b'coz of name of the pipe or something else ) ? Thanks again, Vishal On Fri, Apr 16, 2021 at 1:37 PM Arvid Heise <ar...@apache.org> wrote: > Hi Vishal, > > I think you pretty much nailed it. > > Outstanding transactions should be automatically aborted on restart by > Flink. Flink (re)uses a pool of transaction ids, such that all possible > transactions by Flink are canceled on restart. > > I guess the biggest downside of using a large transaction timeout is that > other clients might leak transactions for a longer period of time or that > they may linger for a longer time if you stop an application entirely (for > example for an upgrade). > > On Fri, Apr 16, 2021 at 4:08 PM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> Hello folks >> >> So AFAIK data loss on exactly once will happen if >> >> - >> >> start a transaction on kafka. >> - >> >> pre commit done ( kafka is prepared for the commit ) >> - >> >> commit fails ( kafka went own or n/w issue or what ever ). kafka has >> an uncommitted transaction >> - >> >> pipe was down for say n minutes and the kafka based transaction time >> out is m minutes, where m < n >> - >> >> the pipe restarts and tries to commit an aborted transaction and >> fails and thus data loss >> >> Thus it is imperative that the ransaction.max.timeout.ms out on kafka is >> a high value ( like n hours ) which should be greater then an SLA for >> downtime of the pipe. As in we have to ensure that the pipe is restarted >> before the transaction.timeout.ms set on the broker. >> >> The impulse is to make ransaction.max.timeout.ms high ( 24 hours ). The >> only implication is what happens if we start a brand new pipeline on the >> same topics which has yet to be resolved transactions, mostly b’coz of >> extended timeout of a previous pipe .. I would assume we are delayed then >> given that kafka will stall subsequent transactions from being visible to >> the consumer, b'coz of this one outstanding trsnasaction ? >> >> And if that is the case, then understandably we have to abort those >> dangling transactions before the 24 hrs time out. While there probably a >> way to do that, does flink help.. as in set a property that will abort a >> transaction on kafka, b'coz we need it to, given the above.. >> >> Again I might have totally misunderstood the whole mechanics and if yes >> apologies and will appreciate some clarifications. >> >> >> Thanks. >> >> >> >> >>