Hi, i have found the bug: https://issues.apache.org/jira/browse/FLINK-6398, will open a PR soon.
Best, Kurt On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Thanks a lot Kurt! > > On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <ykt...@gmail.com> wrote: > >> Thanks for the test case, i will take a look at it. >> >> Flavio Pompermaier <pomperma...@okkam.it>于2017年4月27日 周四03:55写道: >> >>> I've created a repository with a unit test to reproduce the error at >>> https://github.com/fpompermaier/flink-batch-bug/blob/ >>> master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java >>> (probably >>> this error is related also to FLINK-4719). >>> >>> The exception is thrown only when there are null strings and multiple >>> slots per TM, I don't know whether UnilateralSorterMerger is involved or >>> not (but I think so..). >>> A quick fix for this problem would be very appreciated because it's >>> bloking a production deployment.. >>> >>> Thanks in advance to all, >>> Flavio >>> >>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> After digging into the code and test I think that the problem is almost >>>> certainly in the UnilateralSortMerger, there should be a missing >>>> synchronization on some shared object somewhere...Right now I'm trying to >>>> understand if this section of code creates some shared object (like queues) >>>> that are accessed in a bad way when there's spilling to disk: >>>> >>>> // start the thread that reads the input channels >>>> this.readThread = getReadingThread(exceptionHandler, input, >>>> circularQueues, largeRecordHandler, >>>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory))); >>>> >>>> // start the thread that sorts the buffers >>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues, >>>> parentTask); >>>> >>>> // start the thread that handles spilling to secondary storage >>>> this.spillThread = getSpillingThread(exceptionHandler, circularQueues, >>>> parentTask, >>>> memoryManager, ioManager, serializerFactory, comparator, >>>> this.sortReadMemory, this.writeMemory, >>>> maxNumFileHandles); >>>> .... >>>> startThreads(); >>>> >>>> >>>> The problem is that the unit tests of GroupReduceDriver use Record and >>>> testing Rows in not very straightforward and I'm still trying to reproduce >>>> the problem in a local env.. >>>> >>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Thanks for the explanation . Is there a way to force this behaviour in >>>>> a local environment (to try to debug the problem)? >>>>> >>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote: >>>>> >>>>>> Hi Flavio, >>>>>> >>>>>> these files are used for spilling data to disk. In your case sorted >>>>>> runs of records. >>>>>> Later all (up to a fanout threshold) these sorted runs are read and >>>>>> merged to get a completely sorted record stream. >>>>>> >>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>>> >>>>>>> The error appears as soon as some taskmanager generates some >>>>>>> inputchannel file. >>>>>>> What are those files used for? >>>>>>> >>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>>> In another run of the job I had another Exception. Could it be >>>>>>>> helpful? >>>>>>>> >>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading >>>>>>>> Thread' terminated due to an exception: Serializer consumed more bytes >>>>>>>> than >>>>>>>> the record had. This indicates broken serialization. If you are using >>>>>>>> custom serialization types (Value or Writable), check their >>>>>>>> serialization >>>>>>>> methods. If you are using a Kryo-serialized type, check the >>>>>>>> corresponding >>>>>>>> Kryo serializer. >>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask. >>>>>>>> java:465) >>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>>>>>> k.java:355) >>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>>>>>>> exception: >>>>>>>> Serializer consumed more bytes than the record had. This indicates >>>>>>>> broken >>>>>>>> serialization. If you are using custom serialization types (Value or >>>>>>>> Writable), check their serialization methods. If you are using a >>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer. >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>>>> ask.java:1094) >>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>>>>> (GroupReduceDriver.java:99) >>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask. >>>>>>>> java:460) >>>>>>>> ... 3 more >>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>>>> terminated due to an exception: Serializer consumed more bytes than the >>>>>>>> record had. This indicates broken serialization. If you are using >>>>>>>> custom >>>>>>>> serialization types (Value or Writable), check their serialization >>>>>>>> methods. >>>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo >>>>>>>> serializer. >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes than >>>>>>>> the record had. This indicates broken serialization. If you are using >>>>>>>> custom serialization types (Value or Writable), check their >>>>>>>> serialization >>>>>>>> methods. If you are using a Kryo-serialized type, check the >>>>>>>> corresponding >>>>>>>> Kryo serializer. >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>> daptiveSpanningRecordDeserializer.java:123) >>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>>>>> Reader.next(MutableRecordReader.java:42) >>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>>>>> ReaderIterator.java:59) >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor >>>>>>>> ySegment.java:104) >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper. >>>>>>>> readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226) >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read >>>>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231) >>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue. >>>>>>>> java:770) >>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>> deserialize(StringSerializer.java:69) >>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>> deserialize(StringSerializer.java:74) >>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>> deserialize(StringSerializer.java:28) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>> serialize(RowSerializer.java:193) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>> serialize(RowSerializer.java:36) >>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>> daptiveSpanningRecordDeserializer.java:109) >>>>>>>> ... 5 more >>>>>>>> >>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli < >>>>>>>> stefano.bort...@huawei.com> wrote: >>>>>>>> >>>>>>>>> In fact the old problem was with the KryoSerializer missed >>>>>>>>> initialization on the exception that would trigger the spilling on >>>>>>>>> disk. >>>>>>>>> This would lead to dirty serialization buffer that would eventually >>>>>>>>> break >>>>>>>>> the program. Till worked on it debugging the source code generating >>>>>>>>> the >>>>>>>>> error. Perhaps someone could try the same also this time. If Flavio >>>>>>>>> can >>>>>>>>> make the problem reproducible in a shareable program+data. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Stefano >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *From:* Stephan Ewen [mailto:se...@apache.org] >>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM >>>>>>>>> *To:* user <user@flink.apache.org> >>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> In the past, these errors were most often caused by bugs in the >>>>>>>>> serializers, not in the sorter. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> What types are you using at that point? The Stack Trace reveals >>>>>>>>> ROW and StringValue, any other involved types? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to >>>>>>>>> force spilling to disk) and the job failed almost immediately.. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of >>>>>>>>> the entire data (using the id value to filter data with parquet push >>>>>>>>> down >>>>>>>>> filters) and all slices completed successfully :( >>>>>>>>> >>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) >>>>>>>>> to see if this was somehow a factor of stress but it didn't cause any >>>>>>>>> error. >>>>>>>>> >>>>>>>>> Then I almost doubled the number of rows to process and finally >>>>>>>>> the error showed up again. >>>>>>>>> >>>>>>>>> It seems somehow related to spilling to disk but I can't really >>>>>>>>> understand what's going on :( >>>>>>>>> >>>>>>>>> This is a summary of my debug attempts: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 4 Task managers with 6 GB and 1 slot each, parallelism = 4 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> id < 10.000.000.000 => 1.857.365 rows => OK >>>>>>>>> >>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => >>>>>>>>> OK >>>>>>>>> >>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 >>>>>>>>> rows => OK >>>>>>>>> >>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 >>>>>>>>> rows => OK >>>>>>>>> >>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16 >>>>>>>>> >>>>>>>>> >>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>>>>> >>>>>>>>> id >= 99.945.000.000 => 56.825.172 rows => ERROR >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Any help is appreciated.. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> >>>>>>>>> Flavio >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>> I could but only if there's a good probability that it fix the >>>>>>>>> problem...how confident are you about it? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Looking at git log of DataInputDeserializer.java , there has been >>>>>>>>> some recent change. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is >>>>>>>>> reproducible ? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Cheers >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>> Hi to all, >>>>>>>>> >>>>>>>>> I think I'm again on the weird Exception with the >>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer... >>>>>>>>> >>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink >>>>>>>>> spills to disk but the Exception thrown is not very helpful. Any idea? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>>>>>>>> exception: >>>>>>>>> null >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>>>>> ask.java:1094) >>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>>>>>> (GroupReduceDriver.java:99) >>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask. >>>>>>>>> java:460) >>>>>>>>> ... 3 more >>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>>>>> terminated due to an exception: null >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>>>> Caused by: java.io.EOFException >>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi >>>>>>>>> gnedByte(DataInputDeserializer.java:306) >>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue. >>>>>>>>> java:747) >>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>> deserialize(StringSerializer.java:69) >>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>> deserialize(StringSerializer.java:74) >>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>> deserialize(StringSerializer.java:28) >>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>> serialize(RowSerializer.java:193) >>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>> serialize(RowSerializer.java:36) >>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>>> daptiveSpanningRecordDeserializer.java:144) >>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>>>>>> Reader.next(MutableRecordReader.java:42) >>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>>>>>> ReaderIterator.java:59) >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> >>>>>>>>> Flavio >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>>> >>> -- >> Best, >> Kurt >> > >