Hi,
with accumulator you mean the ones you get from 
RuntimeContext.addAccumulator/getAccumulator? I’m afraid these are not 
fault-tolerant which means that the count in these probably doesn’t reflect the 
actual number of elements that were processed. When a job fails and restarts 
the accumulators should start from scratch. This makes me wonder how yours ever 
reach the required 2 mio, for it to be considered “done”.

This keeps getting more mysterious… 

By the way, what are you using as StateBackend and checkpoint interval?

Cheers,
Aljoscha
> On 08 Mar 2016, at 13:38, Maximilian Bode <maximilian.b...@tngtech.com> wrote:
> 
> Hi,
> thanks for the fast answer. Answers inline.
> 
>> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <aljos...@apache.org>:
>> 
>> Hi,
>> a missing part file for one of the parallel sinks is not necessarily a 
>> problem. This can happen if that parallel instance of the sink never 
>> received data after the job successfully restarted.
>> 
>> Missing data, however, is a problem. Maybe I need some more information 
>> about your setup:
>> 
>> - When are you inspecting the part files?
> Some time after the cluster is shut down
>> - Do you shutdown the Flink Job before checking? If so, how do you shut it 
>> down.
> Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written 
> only after cancelling the job, right?
>> - When do you know whether all the data from Kafka was consumed by Flink and 
>> has passed through the pipeline into HDFS?
> I have an accumulator in a map right before writing into HDFS. Also, the 
> RollingSink has a DataTimeBucketer which makes it transparent when no new 
> data is arriving anymore as the last bucket is from some minutes ago.
>> 
>> Cheers,
>> Aljoscha
>>> On 08 Mar 2016, at 13:19, Maximilian Bode <maximilian.b...@tngtech.com> 
>>> wrote:
>>> 
>>> Hi Aljoscha,
>>> 
>>> oh I see. I was under the impression this file was used internally and the 
>>> output being completed at the end. Ok, so I extracted the relevant lines 
>>> using
>>>     for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > 
>>> "$i.final"; done
>>> which seems to do the trick.
>>> 
>>> Unfortunately, now some records are missing again. In particular, there are 
>>> the files
>>>     part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding 
>>> .valid-length files
>>>     part-0-1, part-1-1, ..., part-10-0
>>> in the bucket, where job parallelism=12. So it looks to us as if one of the 
>>> files was not even created in the second attempt. This behavior seems to be 
>>> what somewhat reproducible, cf. my earlier email where the part-11 file 
>>> disappeared as well.
>>> 
>>> Thanks again for your help.
>>> 
>>> Cheers,
>>> Max
>>> —
>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>> 
>>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>>> 
>>>> Hi,
>>>> are you taking the “.valid-length” files into account. The problem with 
>>>> doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not 
>>>> possible to truncate files. So the trick we’re using is to write the 
>>>> length up to which a file is valid if we would normally need to truncate 
>>>> it. (If the job fails in the middle of writing the output files have to be 
>>>> truncated to a valid position.) For example, say you have an output file 
>>>> part-8-0. Now, if there exists a file part-8-0.valid-length this file 
>>>> tells you up to which position the file part-8-0 is valid. So you should 
>>>> only read up to this point.
>>>> 
>>>> The name of the “.valid-length” suffix can also be configured, by the way, 
>>>> as can all the other stuff.
>>>> 
>>>> If this is not the problem then I definitely have to investigate further. 
>>>> I’ll also look into the Hadoop 2.4.1 build problem.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 08 Mar 2016, at 10:26, Maximilian Bode <maximilian.b...@tngtech.com> 
>>>>> wrote:
>>>>> 
>>>>> Hi Aljoscha,
>>>>> thanks again for getting back to me. I built from your branch and the 
>>>>> exception is not occurring anymore. The RollingSink state can be restored.
>>>>> 
>>>>> Still, the exactly-once guarantee seems not to be fulfilled, there are 
>>>>> always some extra records after killing either a task manager or the job 
>>>>> manager. Do you have an idea where this behavior might be coming from? (I 
>>>>> guess concrete numbers will not help greatly as there are so many 
>>>>> parameters influencing them. Still, in our test scenario, we produce 2 
>>>>> million records in a Kafka queue but in the final output files there are 
>>>>> on the order of 2.1 million records, so a 5% error. The job is running in 
>>>>> a per-job YARN session with n=3, s=4 with a checkpointing interval of 
>>>>> 10s.)
>>>>> 
>>>>> On another (maybe unrelated) note: when I pulled your branch, the Travis 
>>>>> build did not go through for -Dhadoop.version=2.4.1. I have not looked 
>>>>> into this further as of now, is this one of the tests known to fail 
>>>>> sometimes?
>>>>> 
>>>>> Cheers,
>>>>> Max
>>>>> <travis.log>
>>>>> —
>>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>> 
>>>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>>>>> 
>>>>>> Hi Maximilian,
>>>>>> sorry for the delay, we where very busy with the release last week. I 
>>>>>> had a hunch about the problem but I think I found a fix now. The problem 
>>>>>> is in snapshot restore. When restoring, the sink tries to clean up any 
>>>>>> files that where previously in progress. If Flink restores to the same 
>>>>>> snapshot twice in a row then it will try to clean up the leftover files 
>>>>>> twice but they are not there anymore, this causes the exception.
>>>>>> 
>>>>>> I have a fix in my branch: 
>>>>>> https://github.com/aljoscha/flink/tree/rolling-sink-fix
>>>>>> 
>>>>>> Could you maybe try if this solves your problem? Which version of Flink 
>>>>>> are you using? You would have to build from source to try it out. 
>>>>>> Alternatively I could build it and put it onto a maven snapshot 
>>>>>> repository for you to try it out.
>>>>>> 
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>>>>> 
>>>>>>> Hi,
>>>>>>> did you check whether there are any files at your specified HDFS output 
>>>>>>> location? If yes, which files are there?
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode 
>>>>>>>> <maximilian.b...@tngtech.com> wrote:
>>>>>>>> 
>>>>>>>> Just for the sake of completeness: this also happens when killing a 
>>>>>>>> task manager and is therefore probably unrelated to job manager HA.
>>>>>>>> 
>>>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode 
>>>>>>>>> <maximilian.b...@tngtech.com>:
>>>>>>>>> 
>>>>>>>>> Hi everyone,
>>>>>>>>> 
>>>>>>>>> unfortunately, I am running into another problem trying to establish 
>>>>>>>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS).
>>>>>>>>> 
>>>>>>>>> When using
>>>>>>>>> 
>>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new 
>>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound");
>>>>>>>>> sink.setBucketer(new NonRollingBucketer());
>>>>>>>>> output.addSink(sink);
>>>>>>>>> 
>>>>>>>>> and then killing the job manager, the new job manager is unable to 
>>>>>>>>> restore the old state throwing
>>>>>>>>> ---
>>>>>>>>> java.lang.Exception: Could not restore checkpointed state to 
>>>>>>>>> operators and functions
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
>>>>>>>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>>>>       at java.lang.Thread.run(Thread.java:744)
>>>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: 
>>>>>>>>> In-Progress file 
>>>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither 
>>>>>>>>> moved to pending nor is still in progress.
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446)
>>>>>>>>>       ... 3 more
>>>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file 
>>>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither 
>>>>>>>>> moved to pending nor is still in progress.
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122)
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
>>>>>>>>>       ... 4 more
>>>>>>>>> ---
>>>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact 
>>>>>>>>> using 2.4.0 – might this be the same issue?
>>>>>>>>> 
>>>>>>>>> Another thing I could think of is that the job is not configured 
>>>>>>>>> correctly and there is some sort of timing issue. The checkpoint 
>>>>>>>>> interval is 10 seconds, everything else was left at default value. 
>>>>>>>>> Then again, as the NonRollingBucketer is used, there should not be 
>>>>>>>>> any timing issues, right?
>>>>>>>>> 
>>>>>>>>> Cheers,
>>>>>>>>> Max
>>>>>>>>> 
>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979
>>>>>>>>> 
>>>>>>>>> —
>>>>>>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com
>>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to