Hi Aljoscha, thank you very much, I will try if this fixes the problem and get back to you. I am using 1.0.0 as of today :)
Cheers, Max — Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 > Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>: > > Hi Maximilian, > sorry for the delay, we where very busy with the release last week. I had a > hunch about the problem but I think I found a fix now. The problem is in > snapshot restore. When restoring, the sink tries to clean up any files that > where previously in progress. If Flink restores to the same snapshot twice in > a row then it will try to clean up the leftover files twice but they are not > there anymore, this causes the exception. > > I have a fix in my branch: > https://github.com/aljoscha/flink/tree/rolling-sink-fix > > Could you maybe try if this solves your problem? Which version of Flink are > you using? You would have to build from source to try it out. Alternatively I > could build it and put it onto a maven snapshot repository for you to try it > out. > > Cheers, > Aljoscha >> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote: >> >> Hi, >> did you check whether there are any files at your specified HDFS output >> location? If yes, which files are there? >> >> Cheers, >> Aljoscha >>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.b...@tngtech.com> >>> wrote: >>> >>> Just for the sake of completeness: this also happens when killing a task >>> manager and is therefore probably unrelated to job manager HA. >>> >>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode >>>> <maximilian.b...@tngtech.com>: >>>> >>>> Hi everyone, >>>> >>>> unfortunately, I am running into another problem trying to establish >>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS). >>>> >>>> When using >>>> >>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new >>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound"); >>>> sink.setBucketer(new NonRollingBucketer()); >>>> output.addSink(sink); >>>> >>>> and then killing the job manager, the new job manager is unable to restore >>>> the old state throwing >>>> --- >>>> java.lang.Exception: Could not restore checkpointed state to operators and >>>> functions >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> at java.lang.Thread.run(Thread.java:744) >>>> Caused by: java.lang.Exception: Failed to restore state to function: >>>> In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 >>>> was neither moved to pending nor is still in progress. >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446) >>>> ... 3 more >>>> Caused by: java.lang.RuntimeException: In-Progress file >>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved >>>> to pending nor is still in progress. >>>> at >>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686) >>>> at >>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) >>>> ... 4 more >>>> --- >>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using >>>> 2.4.0 – might this be the same issue? >>>> >>>> Another thing I could think of is that the job is not configured correctly >>>> and there is some sort of timing issue. The checkpoint interval is 10 >>>> seconds, everything else was left at default value. Then again, as the >>>> NonRollingBucketer is used, there should not be any timing issues, right? >>>> >>>> Cheers, >>>> Max >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-2979 >>>> >>>> — >>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>> >>> >> >
signature.asc
Description: Message signed with OpenPGP using GPGMail