Great!! Thanks a lot Kurt On Thu, Apr 27, 2017 at 5:31 PM, Kurt Young <ykt...@gmail.com> wrote:
> Hi, i have found the bug: https://issues.apache.org/jira/browse/FLINK-6398, > will open a PR soon. > > Best, > Kurt > > On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Thanks a lot Kurt! >> >> On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <ykt...@gmail.com> wrote: >> >>> Thanks for the test case, i will take a look at it. >>> >>> Flavio Pompermaier <pomperma...@okkam.it>于2017年4月27日 周四03:55写道: >>> >>>> I've created a repository with a unit test to reproduce the error at >>>> https://github.com/fpompermaier/flink-batch-bug/blob/mast >>>> er/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java >>>> (probably >>>> this error is related also to FLINK-4719). >>>> >>>> The exception is thrown only when there are null strings and multiple >>>> slots per TM, I don't know whether UnilateralSorterMerger is involved or >>>> not (but I think so..). >>>> A quick fix for this problem would be very appreciated because it's >>>> bloking a production deployment.. >>>> >>>> Thanks in advance to all, >>>> Flavio >>>> >>>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> After digging into the code and test I think that the problem is >>>>> almost certainly in the UnilateralSortMerger, there should be a missing >>>>> synchronization on some shared object somewhere...Right now I'm trying to >>>>> understand if this section of code creates some shared object (like >>>>> queues) >>>>> that are accessed in a bad way when there's spilling to disk: >>>>> >>>>> // start the thread that reads the input channels >>>>> this.readThread = getReadingThread(exceptionHandler, input, >>>>> circularQueues, largeRecordHandler, >>>>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory))); >>>>> >>>>> // start the thread that sorts the buffers >>>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues, >>>>> parentTask); >>>>> >>>>> // start the thread that handles spilling to secondary storage >>>>> this.spillThread = getSpillingThread(exceptionHandler, >>>>> circularQueues, parentTask, >>>>> memoryManager, ioManager, serializerFactory, comparator, >>>>> this.sortReadMemory, this.writeMemory, >>>>> maxNumFileHandles); >>>>> .... >>>>> startThreads(); >>>>> >>>>> >>>>> The problem is that the unit tests of GroupReduceDriver use Record and >>>>> testing Rows in not very straightforward and I'm still trying to reproduce >>>>> the problem in a local env.. >>>>> >>>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Thanks for the explanation . Is there a way to force this behaviour >>>>>> in a local environment (to try to debug the problem)? >>>>>> >>>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote: >>>>>> >>>>>>> Hi Flavio, >>>>>>> >>>>>>> these files are used for spilling data to disk. In your case sorted >>>>>>> runs of records. >>>>>>> Later all (up to a fanout threshold) these sorted runs are read and >>>>>>> merged to get a completely sorted record stream. >>>>>>> >>>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it> >>>>>>> : >>>>>>> >>>>>>>> The error appears as soon as some taskmanager generates some >>>>>>>> inputchannel file. >>>>>>>> What are those files used for? >>>>>>>> >>>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>>> In another run of the job I had another Exception. Could it be >>>>>>>>> helpful? >>>>>>>>> >>>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading >>>>>>>>> Thread' terminated due to an exception: Serializer consumed more >>>>>>>>> bytes than >>>>>>>>> the record had. This indicates broken serialization. If you are using >>>>>>>>> custom serialization types (Value or Writable), check their >>>>>>>>> serialization >>>>>>>>> methods. If you are using a Kryo-serialized type, check the >>>>>>>>> corresponding >>>>>>>>> Kryo serializer. >>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>>>>>> ava:465) >>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>>>>>>> k.java:355) >>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>>>>>>>> exception: >>>>>>>>> Serializer consumed more bytes than the record had. This indicates >>>>>>>>> broken >>>>>>>>> serialization. If you are using custom serialization types (Value or >>>>>>>>> Writable), check their serialization methods. If you are using a >>>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer. >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>>>>> ask.java:1094) >>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>>>>>> (GroupReduceDriver.java:99) >>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>>>>>> ava:460) >>>>>>>>> ... 3 more >>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>>>>> terminated due to an exception: Serializer consumed more bytes than >>>>>>>>> the >>>>>>>>> record had. This indicates broken serialization. If you are using >>>>>>>>> custom >>>>>>>>> serialization types (Value or Writable), check their serialization >>>>>>>>> methods. >>>>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo >>>>>>>>> serializer. >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes >>>>>>>>> than the record had. This indicates broken serialization. If you are >>>>>>>>> using >>>>>>>>> custom serialization types (Value or Writable), check their >>>>>>>>> serialization >>>>>>>>> methods. If you are using a Kryo-serialized type, check the >>>>>>>>> corresponding >>>>>>>>> Kryo serializer. >>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>>> daptiveSpanningRecordDeserializer.java:123) >>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>>>>>> Reader.next(MutableRecordReader.java:42) >>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>>>>>> ReaderIterator.java:59) >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor >>>>>>>>> ySegment.java:104) >>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read >>>>>>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226) >>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read >>>>>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231) >>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja >>>>>>>>> va:770) >>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>> deserialize(StringSerializer.java:69) >>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>> deserialize(StringSerializer.java:74) >>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>> deserialize(StringSerializer.java:28) >>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>> serialize(RowSerializer.java:193) >>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>> serialize(RowSerializer.java:36) >>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>>> daptiveSpanningRecordDeserializer.java:109) >>>>>>>>> ... 5 more >>>>>>>>> >>>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli < >>>>>>>>> stefano.bort...@huawei.com> wrote: >>>>>>>>> >>>>>>>>>> In fact the old problem was with the KryoSerializer missed >>>>>>>>>> initialization on the exception that would trigger the spilling on >>>>>>>>>> disk. >>>>>>>>>> This would lead to dirty serialization buffer that would eventually >>>>>>>>>> break >>>>>>>>>> the program. Till worked on it debugging the source code generating >>>>>>>>>> the >>>>>>>>>> error. Perhaps someone could try the same also this time. If Flavio >>>>>>>>>> can >>>>>>>>>> make the problem reproducible in a shareable program+data. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Stefano >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> *From:* Stephan Ewen [mailto:se...@apache.org] >>>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM >>>>>>>>>> *To:* user <user@flink.apache.org> >>>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> In the past, these errors were most often caused by bugs in the >>>>>>>>>> serializers, not in the sorter. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> What types are you using at that point? The Stack Trace reveals >>>>>>>>>> ROW and StringValue, any other involved types? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>> >>>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to >>>>>>>>>> force spilling to disk) and the job failed almost immediately.. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>> >>>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of >>>>>>>>>> the entire data (using the id value to filter data with parquet push >>>>>>>>>> down >>>>>>>>>> filters) and all slices completed successfully :( >>>>>>>>>> >>>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) >>>>>>>>>> to see if this was somehow a factor of stress but it didn't cause >>>>>>>>>> any error. >>>>>>>>>> >>>>>>>>>> Then I almost doubled the number of rows to process and finally >>>>>>>>>> the error showed up again. >>>>>>>>>> >>>>>>>>>> It seems somehow related to spilling to disk but I can't really >>>>>>>>>> understand what's going on :( >>>>>>>>>> >>>>>>>>>> This is a summary of my debug attempts: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 4 Task managers with 6 GB and 1 slot each, parallelism = 4 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> id < 10.000.000.000 => 1.857.365 rows => OK >>>>>>>>>> >>>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows >>>>>>>>>> => OK >>>>>>>>>> >>>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 >>>>>>>>>> rows => OK >>>>>>>>>> >>>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 >>>>>>>>>> rows => OK >>>>>>>>>> >>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>>>>>> >>>>>>>>>> id >= 99.945.000.000 => 56.825.172 rows => ERROR >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Any help is appreciated.. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> >>>>>>>>>> Flavio >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>> >>>>>>>>>> I could but only if there's a good probability that it fix the >>>>>>>>>> problem...how confident are you about it? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Looking at git log of DataInputDeserializer.java , there has been >>>>>>>>>> some recent change. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is >>>>>>>>>> reproducible ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Cheers >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>> >>>>>>>>>> Hi to all, >>>>>>>>>> >>>>>>>>>> I think I'm again on the weird Exception with the >>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer... >>>>>>>>>> >>>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink >>>>>>>>>> spills to disk but the Exception thrown is not very helpful. Any >>>>>>>>>> idea? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>>>>>>>>> exception: >>>>>>>>>> null >>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>>>>>> ask.java:1094) >>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>>>>>>> (GroupReduceDriver.java:99) >>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>>>>>>> ava:460) >>>>>>>>>> ... 3 more >>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading >>>>>>>>>> Thread' terminated due to an exception: null >>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>>>>> Caused by: java.io.EOFException >>>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi >>>>>>>>>> gnedByte(DataInputDeserializer.java:306) >>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja >>>>>>>>>> va:747) >>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>>> deserialize(StringSerializer.java:69) >>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>>> deserialize(StringSerializer.java:74) >>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>>> deserialize(StringSerializer.java:28) >>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>>> serialize(RowSerializer.java:193) >>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>>> serialize(RowSerializer.java:36) >>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>>>> daptiveSpanningRecordDeserializer.java:144) >>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>>>>>>> Reader.next(MutableRecordReader.java:42) >>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>>>>>>> ReaderIterator.java:59) >>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> >>>>>>>>>> Flavio >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>>> >>>> -- >>> Best, >>> Kurt >>> >> >> >