I've created a repository with a unit test to reproduce the error at https://github.com/fpompermaier/flink-batch-bug/ blob/master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java (probably this error is related also to FLINK-4719).
The exception is thrown only when there are null strings and multiple slots per TM, I don't know whether UnilateralSorterMerger is involved or not (but I think so..). A quick fix for this problem would be very appreciated because it's bloking a production deployment.. Thanks in advance to all, Flavio On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > After digging into the code and test I think that the problem is almost > certainly in the UnilateralSortMerger, there should be a missing > synchronization on some shared object somewhere...Right now I'm trying to > understand if this section of code creates some shared object (like queues) > that are accessed in a bad way when there's spilling to disk: > > // start the thread that reads the input channels > this.readThread = getReadingThread(exceptionHandler, input, > circularQueues, largeRecordHandler, > parentTask, serializer, ((long) (startSpillingFraction * sortMemory))); > > // start the thread that sorts the buffers > this.sortThread = getSortingThread(exceptionHandler, circularQueues, > parentTask); > > // start the thread that handles spilling to secondary storage > this.spillThread = getSpillingThread(exceptionHandler, circularQueues, > parentTask, > memoryManager, ioManager, serializerFactory, comparator, > this.sortReadMemory, this.writeMemory, > maxNumFileHandles); > .... > startThreads(); > > > The problem is that the unit tests of GroupReduceDriver use Record and > testing Rows in not very straightforward and I'm still trying to reproduce > the problem in a local env.. > > On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Thanks for the explanation . Is there a way to force this behaviour in a >> local environment (to try to debug the problem)? >> >> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote: >> >>> Hi Flavio, >>> >>> these files are used for spilling data to disk. In your case sorted runs >>> of records. >>> Later all (up to a fanout threshold) these sorted runs are read and >>> merged to get a completely sorted record stream. >>> >>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>> >>>> The error appears as soon as some taskmanager generates some >>>> inputchannel file. >>>> What are those files used for? >>>> >>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> In another run of the job I had another Exception. Could it be helpful? >>>>> >>>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread' >>>>> terminated due to an exception: Serializer consumed more bytes than the >>>>> record had. This indicates broken serialization. If you are using custom >>>>> serialization types (Value or Writable), check their serialization >>>>> methods. >>>>> If you are using a Kryo-serialized type, check the corresponding Kryo >>>>> serializer. >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>> ava:465) >>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>>> k.java:355) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: >>>>> Serializer consumed more bytes than the record had. This indicates broken >>>>> serialization. If you are using custom serialization types (Value or >>>>> Writable), check their serialization methods. If you are using a >>>>> Kryo-serialized type, check the corresponding Kryo serializer. >>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>> .getIterator(UnilateralSortMerger.java:619) >>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>> ask.java:1094) >>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>> (GroupReduceDriver.java:99) >>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>> ava:460) >>>>> ... 3 more >>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>> terminated due to an exception: Serializer consumed more bytes than the >>>>> record had. This indicates broken serialization. If you are using custom >>>>> serialization types (Value or Writable), check their serialization >>>>> methods. >>>>> If you are using a Kryo-serialized type, check the corresponding Kryo >>>>> serializer. >>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>> Caused by: java.io.IOException: Serializer consumed more bytes than >>>>> the record had. This indicates broken serialization. If you are using >>>>> custom serialization types (Value or Writable), check their serialization >>>>> methods. If you are using a Kryo-serialized type, check the corresponding >>>>> Kryo serializer. >>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>> daptiveSpanningRecordDeserializer.java:123) >>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>> Reader.next(MutableRecordReader.java:42) >>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>> ReaderIterator.java:59) >>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor >>>>> ySegment.java:104) >>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read >>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226) >>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read >>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231) >>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770) >>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>> deserialize(StringSerializer.java:69) >>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>> deserialize(StringSerializer.java:74) >>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>> deserialize(StringSerializer.java:28) >>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>> serialize(RowSerializer.java:193) >>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>> serialize(RowSerializer.java:36) >>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>> daptiveSpanningRecordDeserializer.java:109) >>>>> ... 5 more >>>>> >>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli < >>>>> stefano.bort...@huawei.com> wrote: >>>>> >>>>>> In fact the old problem was with the KryoSerializer missed >>>>>> initialization on the exception that would trigger the spilling on disk. >>>>>> This would lead to dirty serialization buffer that would eventually break >>>>>> the program. Till worked on it debugging the source code generating the >>>>>> error. Perhaps someone could try the same also this time. If Flavio can >>>>>> make the problem reproducible in a shareable program+data. >>>>>> >>>>>> >>>>>> >>>>>> Stefano >>>>>> >>>>>> >>>>>> >>>>>> *From:* Stephan Ewen [mailto:se...@apache.org] >>>>>> *Sent:* Friday, April 21, 2017 10:04 AM >>>>>> *To:* user <user@flink.apache.org> >>>>>> *Subject:* Re: UnilateralSortMerger error (again) >>>>>> >>>>>> >>>>>> >>>>>> In the past, these errors were most often caused by bugs in the >>>>>> serializers, not in the sorter. >>>>>> >>>>>> >>>>>> >>>>>> What types are you using at that point? The Stack Trace reveals ROW >>>>>> and StringValue, any other involved types? >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force >>>>>> spilling to disk) and the job failed almost immediately.. >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>> I debugged a bit the process repeating the job on a sub-slice of the >>>>>> entire data (using the id value to filter data with parquet push down >>>>>> filters) and all slices completed successfully :( >>>>>> >>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to >>>>>> see if this was somehow a factor of stress but it didn't cause any error. >>>>>> >>>>>> Then I almost doubled the number of rows to process and finally the >>>>>> error showed up again. >>>>>> >>>>>> It seems somehow related to spilling to disk but I can't really >>>>>> understand what's going on :( >>>>>> >>>>>> This is a summary of my debug attempts: >>>>>> >>>>>> >>>>>> >>>>>> 4 Task managers with 6 GB and 1 slot each, parallelism = 4 >>>>>> >>>>>> >>>>>> >>>>>> id < 10.000.000.000 => 1.857.365 rows => OK >>>>>> >>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK >>>>>> >>>>>> id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 rows >>>>>> => OK >>>>>> >>>>>> id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 rows >>>>>> => OK >>>>>> >>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>> >>>>>> >>>>>> >>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16 >>>>>> >>>>>> >>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>> >>>>>> id >= 99.945.000.000 => 56.825.172 rows => ERROR >>>>>> >>>>>> >>>>>> >>>>>> Any help is appreciated.. >>>>>> >>>>>> Best, >>>>>> >>>>>> Flavio >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>> I could but only if there's a good probability that it fix the >>>>>> problem...how confident are you about it? >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>> >>>>>> Looking at git log of DataInputDeserializer.java , there has been >>>>>> some recent change. >>>>>> >>>>>> >>>>>> >>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is >>>>>> reproducible ? >>>>>> >>>>>> >>>>>> >>>>>> Cheers >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>> Hi to all, >>>>>> >>>>>> I think I'm again on the weird Exception with the >>>>>> SpillingAdaptiveSpanningRecordDeserializer... >>>>>> >>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills >>>>>> to disk but the Exception thrown is not very helpful. Any idea? >>>>>> >>>>>> >>>>>> >>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: >>>>>> null >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>> ask.java:1094) >>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>>> (GroupReduceDriver.java:99) >>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>>> ava:460) >>>>>> ... 3 more >>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>> terminated due to an exception: null >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>>> Caused by: java.io.EOFException >>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi >>>>>> gnedByte(DataInputDeserializer.java:306) >>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja >>>>>> va:747) >>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>> deserialize(StringSerializer.java:69) >>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>> deserialize(StringSerializer.java:74) >>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>> deserialize(StringSerializer.java:28) >>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>> serialize(RowSerializer.java:193) >>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>> serialize(RowSerializer.java:36) >>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>> daptiveSpanningRecordDeserializer.java:144) >>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>>> Reader.next(MutableRecordReader.java:42) >>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>>> ReaderIterator.java:59) >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>>> >>>>>> >>>>>> Best, >>>>>> >>>>>> Flavio >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>> > >