I think, I found a code path (race between threads) that may lead to two markers being in the list.
I created https://issues.apache.org/jira/browse/FLINK-8896 to track this and will have a pull request ready (probably) today. Nico On 07/03/18 10:09, Mu Kong wrote: > Hi Gordon, > > Thanks for your response. > I think I've misspoken about the failure after "n/a" exception. > The behavior after this exception would be: > > switched from RUNNING to CANCELING > switched from CANCELING to CANCELED > Try to restart or fail the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx) if no > longer possible. > switched from state FAILING to RESTARTING > Restarting the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx) > Recovering checkpoints from ZooKeeper > Found 1 checkpoints in ZooKeeper > Trying to retrieve checkpoint 1091 > Restoring from latest valid checkpoint: Checkpoint 1091 @ > xxxxxxxxxxxxxxxxxxxx for xxxxxxxxxxxxxxxxxxxx > switched from CREATED to SCHEDULED > switched from SCHEDULED to DEPLOYING > switched from DEPLOYING to RUNNING > (several check pointings) > switched from RUNNING to FAILED > TimerException{java.io.EOFException:Premature EOF: no length prefix > available} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.EOFException: Premature EOF: no length prefix available > at > org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449) > > Since there several successful check points after the restart, I think > the later failure might be something else. > Also, could you please share more information about the MARKER in the > code? Like which piece of code should I look for. > > And thanks for the suggestion to let me upgrade the flink to 1.3.2 > > Best regards, > Mu > > > On Wed, Mar 7, 2018 at 3:04 PM, Tzu-Li Tai <tzuli...@gmail.com > <mailto:tzuli...@gmail.com>> wrote: > > Hi Mu, > > You mentioned that the job stopped after the "n/a" topic error, but > the job > failed to recover. > What exception did you encounter in the restart executions? Was it > the same > error? > This would verify if we actually should be removing more than one of > these > special MARKER partition states. > > On the other hand, if I recall correctly, the Kafka consumer had a > severe > bug in 1.3.0 which could lead to potential duplicate data, which was > fixed > in 1.3.2. Though I don't think it is related to the error you > encountered, I > strongly recommend that you use 1.3.2 instead. > > Cheers, > Gordon > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> > >
signature.asc
Description: OpenPGP digital signature