Hi Aljoscha,

thank you very much, I will try if this fixes the problem and get back to you. 
I am using 1.0.0 as of today :)

Cheers,
 Max
—
Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi Maximilian,
> sorry for the delay, we where very busy with the release last week. I had a 
> hunch about the problem but I think I found a fix now. The problem is in 
> snapshot restore. When restoring, the sink tries to clean up any files that 
> where previously in progress. If Flink restores to the same snapshot twice in 
> a row then it will try to clean up the leftover files twice but they are not 
> there anymore, this causes the exception.
> 
> I have a fix in my branch: 
> https://github.com/aljoscha/flink/tree/rolling-sink-fix
> 
> Could you maybe try if this solves your problem? Which version of Flink are 
> you using? You would have to build from source to try it out. Alternatively I 
> could build it and put it onto a maven snapshot repository for you to try it 
> out.
> 
> Cheers,
> Aljoscha
>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote:
>> 
>> Hi,
>> did you check whether there are any files at your specified HDFS output 
>> location? If yes, which files are there?
>> 
>> Cheers,
>> Aljoscha
>>> On 03 Mar 2016, at 14:29, Maximilian Bode <maximilian.b...@tngtech.com> 
>>> wrote:
>>> 
>>> Just for the sake of completeness: this also happens when killing a task 
>>> manager and is therefore probably unrelated to job manager HA.
>>> 
>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode 
>>>> <maximilian.b...@tngtech.com>:
>>>> 
>>>> Hi everyone,
>>>> 
>>>> unfortunately, I am running into another problem trying to establish 
>>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>> 
>>>> When using
>>>> 
>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new 
>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>> sink.setBucketer(new NonRollingBucketer());
>>>> output.addSink(sink);
>>>> 
>>>> and then killing the job manager, the new job manager is unable to restore 
>>>> the old state throwing
>>>> ---
>>>> java.lang.Exception: Could not restore checkpointed state to operators and 
>>>> functions
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>    at java.lang.Thread.run(Thread.java:744)
>>>> Caused by: java.lang.Exception: Failed to restore state to function: 
>>>> In-Progress file hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 
>>>> was neither moved to pending nor is still in progress.
>>>>    at 
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>    ... 3 more
>>>> Caused by: java.lang.RuntimeException: In-Progress file 
>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither moved 
>>>> to pending nor is still in progress.
>>>>    at 
>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>    at 
>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>    at 
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>    ... 4 more
>>>> ---
>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact using 
>>>> 2.4.0 – might this be the same issue?
>>>> 
>>>> Another thing I could think of is that the job is not configured correctly 
>>>> and there is some sort of timing issue. The checkpoint interval is 10 
>>>> seconds, everything else was left at default value. Then again, as the 
>>>> NonRollingBucketer is used, there should not be any timing issues, right?
>>>> 
>>>> Cheers,
>>>> Max
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>> 
>>>> —
>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>> 
>>> 
>> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to