Hi, with accumulator you mean the ones you get from RuntimeContext.addAccumulator/getAccumulator? I’m afraid these are not fault-tolerant which means that the count in these probably doesn’t reflect the actual number of elements that were processed. When a job fails and restarts the accumulators should start from scratch. This makes me wonder how yours ever reach the required 2 mio, for it to be considered “done”.
This keeps getting more mysterious… By the way, what are you using as StateBackend and checkpoint interval? Cheers, Aljoscha > On 08 Mar 2016, at 13:38, Maximilian Bode <maximilian.b...@tngtech.com> wrote: > > Hi, > thanks for the fast answer. Answers inline. > >> Am 08.03.2016 um 13:31 schrieb Aljoscha Krettek <aljos...@apache.org>: >> >> Hi, >> a missing part file for one of the parallel sinks is not necessarily a >> problem. This can happen if that parallel instance of the sink never >> received data after the job successfully restarted. >> >> Missing data, however, is a problem. Maybe I need some more information >> about your setup: >> >> - When are you inspecting the part files? > Some time after the cluster is shut down >> - Do you shutdown the Flink Job before checking? If so, how do you shut it >> down. > Via 'cancel' in the Jobmanager Web Interface. Some records seem to be written > only after cancelling the job, right? >> - When do you know whether all the data from Kafka was consumed by Flink and >> has passed through the pipeline into HDFS? > I have an accumulator in a map right before writing into HDFS. Also, the > RollingSink has a DataTimeBucketer which makes it transparent when no new > data is arriving anymore as the last bucket is from some minutes ago. >> >> Cheers, >> Aljoscha >>> On 08 Mar 2016, at 13:19, Maximilian Bode <maximilian.b...@tngtech.com> >>> wrote: >>> >>> Hi Aljoscha, >>> >>> oh I see. I was under the impression this file was used internally and the >>> output being completed at the end. Ok, so I extracted the relevant lines >>> using >>> for i in part-*; do head -c $(cat "_$i.valid-length" | strings) "$i" > >>> "$i.final"; done >>> which seems to do the trick. >>> >>> Unfortunately, now some records are missing again. In particular, there are >>> the files >>> part-0-0, part-1-0, ..., part-10-0, part-11-0, each with corresponding >>> .valid-length files >>> part-0-1, part-1-1, ..., part-10-0 >>> in the bucket, where job parallelism=12. So it looks to us as if one of the >>> files was not even created in the second attempt. This behavior seems to be >>> what somewhat reproducible, cf. my earlier email where the part-11 file >>> disappeared as well. >>> >>> Thanks again for your help. >>> >>> Cheers, >>> Max >>> — >>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>> >>>> Am 08.03.2016 um 11:05 schrieb Aljoscha Krettek <aljos...@apache.org>: >>>> >>>> Hi, >>>> are you taking the “.valid-length” files into account. The problem with >>>> doing “exactly-once” with HDFS is that before Hadoop 2.7 it was not >>>> possible to truncate files. So the trick we’re using is to write the >>>> length up to which a file is valid if we would normally need to truncate >>>> it. (If the job fails in the middle of writing the output files have to be >>>> truncated to a valid position.) For example, say you have an output file >>>> part-8-0. Now, if there exists a file part-8-0.valid-length this file >>>> tells you up to which position the file part-8-0 is valid. So you should >>>> only read up to this point. >>>> >>>> The name of the “.valid-length” suffix can also be configured, by the way, >>>> as can all the other stuff. >>>> >>>> If this is not the problem then I definitely have to investigate further. >>>> I’ll also look into the Hadoop 2.4.1 build problem. >>>> >>>> Cheers, >>>> Aljoscha >>>>> On 08 Mar 2016, at 10:26, Maximilian Bode <maximilian.b...@tngtech.com> >>>>> wrote: >>>>> >>>>> Hi Aljoscha, >>>>> thanks again for getting back to me. I built from your branch and the >>>>> exception is not occurring anymore. The RollingSink state can be restored. >>>>> >>>>> Still, the exactly-once guarantee seems not to be fulfilled, there are >>>>> always some extra records after killing either a task manager or the job >>>>> manager. Do you have an idea where this behavior might be coming from? (I >>>>> guess concrete numbers will not help greatly as there are so many >>>>> parameters influencing them. Still, in our test scenario, we produce 2 >>>>> million records in a Kafka queue but in the final output files there are >>>>> on the order of 2.1 million records, so a 5% error. The job is running in >>>>> a per-job YARN session with n=3, s=4 with a checkpointing interval of >>>>> 10s.) >>>>> >>>>> On another (maybe unrelated) note: when I pulled your branch, the Travis >>>>> build did not go through for -Dhadoop.version=2.4.1. I have not looked >>>>> into this further as of now, is this one of the tests known to fail >>>>> sometimes? >>>>> >>>>> Cheers, >>>>> Max >>>>> <travis.log> >>>>> — >>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>> >>>>>> Am 07.03.2016 um 17:20 schrieb Aljoscha Krettek <aljos...@apache.org>: >>>>>> >>>>>> Hi Maximilian, >>>>>> sorry for the delay, we where very busy with the release last week. I >>>>>> had a hunch about the problem but I think I found a fix now. The problem >>>>>> is in snapshot restore. When restoring, the sink tries to clean up any >>>>>> files that where previously in progress. If Flink restores to the same >>>>>> snapshot twice in a row then it will try to clean up the leftover files >>>>>> twice but they are not there anymore, this causes the exception. >>>>>> >>>>>> I have a fix in my branch: >>>>>> https://github.com/aljoscha/flink/tree/rolling-sink-fix >>>>>> >>>>>> Could you maybe try if this solves your problem? Which version of Flink >>>>>> are you using? You would have to build from source to try it out. >>>>>> Alternatively I could build it and put it onto a maven snapshot >>>>>> repository for you to try it out. >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>>> On 03 Mar 2016, at 14:50, Aljoscha Krettek <aljos...@apache.org> wrote: >>>>>>> >>>>>>> Hi, >>>>>>> did you check whether there are any files at your specified HDFS output >>>>>>> location? If yes, which files are there? >>>>>>> >>>>>>> Cheers, >>>>>>> Aljoscha >>>>>>>> On 03 Mar 2016, at 14:29, Maximilian Bode >>>>>>>> <maximilian.b...@tngtech.com> wrote: >>>>>>>> >>>>>>>> Just for the sake of completeness: this also happens when killing a >>>>>>>> task manager and is therefore probably unrelated to job manager HA. >>>>>>>> >>>>>>>>> Am 03.03.2016 um 14:17 schrieb Maximilian Bode >>>>>>>>> <maximilian.b...@tngtech.com>: >>>>>>>>> >>>>>>>>> Hi everyone, >>>>>>>>> >>>>>>>>> unfortunately, I am running into another problem trying to establish >>>>>>>>> exactly once guarantees (Kafka -> Flink 1.0.0-rc3 -> HDFS). >>>>>>>>> >>>>>>>>> When using >>>>>>>>> >>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>> sink = new >>>>>>>>> RollingSink<Tuple3<Integer,Integer,String>>("hdfs://our.machine.com:8020/hdfs/dir/outbound"); >>>>>>>>> sink.setBucketer(new NonRollingBucketer()); >>>>>>>>> output.addSink(sink); >>>>>>>>> >>>>>>>>> and then killing the job manager, the new job manager is unable to >>>>>>>>> restore the old state throwing >>>>>>>>> --- >>>>>>>>> java.lang.Exception: Could not restore checkpointed state to >>>>>>>>> operators and functions >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:454) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) >>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>>>>>>> at java.lang.Thread.run(Thread.java:744) >>>>>>>>> Caused by: java.lang.Exception: Failed to restore state to function: >>>>>>>>> In-Progress file >>>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither >>>>>>>>> moved to pending nor is still in progress. >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:446) >>>>>>>>> ... 3 more >>>>>>>>> Caused by: java.lang.RuntimeException: In-Progress file >>>>>>>>> hdfs://our.machine.com:8020/hdfs/dir/outbound/part-5-0 was neither >>>>>>>>> moved to pending nor is still in progress. >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:686) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.connectors.fs.RollingSink.restoreState(RollingSink.java:122) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) >>>>>>>>> ... 4 more >>>>>>>>> --- >>>>>>>>> I found a resolved issue [1] concerning Hadoop 2.7.1. We are in fact >>>>>>>>> using 2.4.0 – might this be the same issue? >>>>>>>>> >>>>>>>>> Another thing I could think of is that the job is not configured >>>>>>>>> correctly and there is some sort of timing issue. The checkpoint >>>>>>>>> interval is 10 seconds, everything else was left at default value. >>>>>>>>> Then again, as the NonRollingBucketer is used, there should not be >>>>>>>>> any timing issues, right? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Max >>>>>>>>> >>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-2979 >>>>>>>>> >>>>>>>>> — >>>>>>>>> Maximilian Bode * Junior Consultant * maximilian.b...@tngtech.com >>>>>>>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >>>>>>>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >>>>>>>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >