I think, I found a code path (race between threads) that may lead to two
markers being in the list.

I created https://issues.apache.org/jira/browse/FLINK-8896 to track this
and will have a pull request ready (probably) today.


Nico

On 07/03/18 10:09, Mu Kong wrote:
> Hi Gordon,
> 
> Thanks for your response.
> I think I've misspoken about the failure after "n/a" exception.
> The behavior after this exception would be:
> 
> switched from RUNNING to CANCELING
> switched from CANCELING to CANCELED
> Try to restart or fail the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx) if no
> longer possible.
> switched from state FAILING to RESTARTING
> Restarting the job "XXXXXXXXX" (xxxxxxxxxxxxxxxxxxxx)
> Recovering checkpoints from ZooKeeper
> Found 1 checkpoints in ZooKeeper
> Trying to retrieve checkpoint 1091
> Restoring from latest valid checkpoint: Checkpoint 1091 @
> xxxxxxxxxxxxxxxxxxxx for xxxxxxxxxxxxxxxxxxxx
> switched from CREATED to SCHEDULED
> switched from SCHEDULED to DEPLOYING
> switched from DEPLOYING to RUNNING
> (several check pointings)
> switched from RUNNING to FAILED
> TimerException{java.io.EOFException:Premature EOF: no length prefix
> available}
>         at
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:219)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.EOFException: Premature EOF: no length prefix available
>         at
> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2282)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1347)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
>         at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
> 
> Since there several successful check points after the restart, I think
> the later failure might be something else.
> Also, could you please share more information about the MARKER in the
> code? Like which piece of code should I look for.
> 
> And thanks for the suggestion to let me upgrade the flink to 1.3.2
> 
> Best regards,
> Mu
> 
> 
> On Wed, Mar 7, 2018 at 3:04 PM, Tzu-Li Tai <tzuli...@gmail.com
> <mailto:tzuli...@gmail.com>> wrote:
> 
>     Hi Mu,
> 
>     You mentioned that the job stopped after the "n/a" topic error, but
>     the job
>     failed to recover.
>     What exception did you encounter in the restart executions? Was it
>     the same
>     error?
>     This would verify if we actually should be removing more than one of
>     these
>     special MARKER partition states.
> 
>     On the other hand, if I recall correctly, the Kafka consumer had a
>     severe
>     bug in 1.3.0 which could lead to potential duplicate data, which was
>     fixed
>     in 1.3.2. Though I don't think it is related to the error you
>     encountered, I
>     strongly recommend that you use 1.3.2 instead.
> 
>     Cheers,
>     Gordon
> 
> 
> 
>     --
>     Sent from:
>     http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>     <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to