Thanks for the test case, i will take a look at it. Flavio Pompermaier <pomperma...@okkam.it>于2017年4月27日 周四03:55写道:
> I've created a repository with a unit test to reproduce the error at > https://github.com/fpompermaier/flink-batch-bug/blob/master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java > (probably > this error is related also to FLINK-4719). > > The exception is thrown only when there are null strings and multiple > slots per TM, I don't know whether UnilateralSorterMerger is involved or > not (but I think so..). > A quick fix for this problem would be very appreciated because it's > bloking a production deployment.. > > Thanks in advance to all, > Flavio > > On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> After digging into the code and test I think that the problem is almost >> certainly in the UnilateralSortMerger, there should be a missing >> synchronization on some shared object somewhere...Right now I'm trying to >> understand if this section of code creates some shared object (like queues) >> that are accessed in a bad way when there's spilling to disk: >> >> // start the thread that reads the input channels >> this.readThread = getReadingThread(exceptionHandler, input, >> circularQueues, largeRecordHandler, >> parentTask, serializer, ((long) (startSpillingFraction * sortMemory))); >> >> // start the thread that sorts the buffers >> this.sortThread = getSortingThread(exceptionHandler, circularQueues, >> parentTask); >> >> // start the thread that handles spilling to secondary storage >> this.spillThread = getSpillingThread(exceptionHandler, circularQueues, >> parentTask, >> memoryManager, ioManager, serializerFactory, comparator, >> this.sortReadMemory, this.writeMemory, >> maxNumFileHandles); >> .... >> startThreads(); >> >> >> The problem is that the unit tests of GroupReduceDriver use Record and >> testing Rows in not very straightforward and I'm still trying to reproduce >> the problem in a local env.. >> >> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <pomperma...@okkam.it >> > wrote: >> >>> Thanks for the explanation . Is there a way to force this behaviour in a >>> local environment (to try to debug the problem)? >>> >>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote: >>> >>>> Hi Flavio, >>>> >>>> these files are used for spilling data to disk. In your case sorted >>>> runs of records. >>>> Later all (up to a fanout threshold) these sorted runs are read and >>>> merged to get a completely sorted record stream. >>>> >>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>> >>>>> The error appears as soon as some taskmanager generates some >>>>> inputchannel file. >>>>> What are those files used for? >>>>> >>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> In another run of the job I had another Exception. Could it be >>>>>> helpful? >>>>>> >>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread' >>>>>> terminated due to an exception: Serializer consumed more bytes than the >>>>>> record had. This indicates broken serialization. If you are using custom >>>>>> serialization types (Value or Writable), check their serialization >>>>>> methods. >>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo >>>>>> serializer. >>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) >>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: >>>>>> Serializer consumed more bytes than the record had. This indicates broken >>>>>> serialization. If you are using custom serialization types (Value or >>>>>> Writable), check their serialization methods. If you are using a >>>>>> Kryo-serialized type, check the corresponding Kryo serializer. >>>>>> at >>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094) >>>>>> at >>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) >>>>>> at >>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) >>>>>> ... 3 more >>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>> terminated due to an exception: Serializer consumed more bytes than the >>>>>> record had. This indicates broken serialization. If you are using custom >>>>>> serialization types (Value or Writable), check their serialization >>>>>> methods. >>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo >>>>>> serializer. >>>>>> at >>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >>>>>> Caused by: java.io.IOException: Serializer consumed more bytes than >>>>>> the record had. This indicates broken serialization. If you are using >>>>>> custom serialization types (Value or Writable), check their serialization >>>>>> methods. If you are using a Kryo-serialized type, check the corresponding >>>>>> Kryo serializer. >>>>>> at org.apache.flink.runtime.io >>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:123) >>>>>> at org.apache.flink.runtime.io >>>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) >>>>>> at org.apache.flink.runtime.io >>>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42) >>>>>> at >>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>>>>> at >>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>> at >>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>>>>> at >>>>>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104) >>>>>> at org.apache.flink.runtime.io >>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226) >>>>>> at org.apache.flink.runtime.io >>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231) >>>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770) >>>>>> at >>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >>>>>> at >>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74) >>>>>> at >>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>> serialize(RowSerializer.java:193) >>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>> serialize(RowSerializer.java:36) >>>>>> at >>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >>>>>> at org.apache.flink.runtime.io >>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109) >>>>>> ... 5 more >>>>>> >>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli < >>>>>> stefano.bort...@huawei.com> wrote: >>>>>> >>>>>>> In fact the old problem was with the KryoSerializer missed >>>>>>> initialization on the exception that would trigger the spilling on disk. >>>>>>> This would lead to dirty serialization buffer that would eventually >>>>>>> break >>>>>>> the program. Till worked on it debugging the source code generating the >>>>>>> error. Perhaps someone could try the same also this time. If Flavio can >>>>>>> make the problem reproducible in a shareable program+data. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Stefano >>>>>>> >>>>>>> >>>>>>> >>>>>>> *From:* Stephan Ewen [mailto:se...@apache.org] >>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM >>>>>>> *To:* user <user@flink.apache.org> >>>>>>> *Subject:* Re: UnilateralSortMerger error (again) >>>>>>> >>>>>>> >>>>>>> >>>>>>> In the past, these errors were most often caused by bugs in the >>>>>>> serializers, not in the sorter. >>>>>>> >>>>>>> >>>>>>> >>>>>>> What types are you using at that point? The Stack Trace reveals ROW >>>>>>> and StringValue, any other involved types? >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to >>>>>>> force spilling to disk) and the job failed almost immediately.. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>> I debugged a bit the process repeating the job on a sub-slice of the >>>>>>> entire data (using the id value to filter data with parquet push down >>>>>>> filters) and all slices completed successfully :( >>>>>>> >>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to >>>>>>> see if this was somehow a factor of stress but it didn't cause any >>>>>>> error. >>>>>>> >>>>>>> Then I almost doubled the number of rows to process and finally the >>>>>>> error showed up again. >>>>>>> >>>>>>> It seems somehow related to spilling to disk but I can't really >>>>>>> understand what's going on :( >>>>>>> >>>>>>> This is a summary of my debug attempts: >>>>>>> >>>>>>> >>>>>>> >>>>>>> 4 Task managers with 6 GB and 1 slot each, parallelism = 4 >>>>>>> >>>>>>> >>>>>>> >>>>>>> id < 10.000.000.000 => 1.857.365 rows => OK >>>>>>> >>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK >>>>>>> >>>>>>> id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 rows >>>>>>> => OK >>>>>>> >>>>>>> id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 >>>>>>> rows => OK >>>>>>> >>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>>> >>>>>>> >>>>>>> >>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16 >>>>>>> >>>>>>> >>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>>> >>>>>>> id >= 99.945.000.000 => 56.825.172 rows => ERROR >>>>>>> >>>>>>> >>>>>>> >>>>>>> Any help is appreciated.. >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Flavio >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>> I could but only if there's a good probability that it fix the >>>>>>> problem...how confident are you about it? >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>>> >>>>>>> Looking at git log of DataInputDeserializer.java , there has been >>>>>>> some recent change. >>>>>>> >>>>>>> >>>>>>> >>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is >>>>>>> reproducible ? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>> Hi to all, >>>>>>> >>>>>>> I think I'm again on the weird Exception with the >>>>>>> SpillingAdaptiveSpanningRecordDeserializer... >>>>>>> >>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills >>>>>>> to disk but the Exception thrown is not very helpful. Any idea? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>>>>>> exception: >>>>>>> null >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) >>>>>>> ... 3 more >>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>>> terminated due to an exception: null >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>> Caused by: java.io.EOFException >>>>>>> at >>>>>>> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306) >>>>>>> at >>>>>>> org.apache.flink.types.StringValue.readString(StringValue.java:747) >>>>>>> at >>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) >>>>>>> at >>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74) >>>>>>> at >>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) >>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>> serialize(RowSerializer.java:193) >>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>> serialize(RowSerializer.java:36) >>>>>>> at >>>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) >>>>>>> at org.apache.flink.runtime.io >>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144) >>>>>>> at org.apache.flink.runtime.io >>>>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) >>>>>>> at org.apache.flink.runtime.io >>>>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Flavio >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >> >> > -- Best, Kurt