Thank you Kurt! 2017-04-27 17:40 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
> Great!! Thanks a lot Kurt > > On Thu, Apr 27, 2017 at 5:31 PM, Kurt Young <ykt...@gmail.com> wrote: > >> Hi, i have found the bug: https://issues.apache.org >> /jira/browse/FLINK-6398, will open a PR soon. >> >> Best, >> Kurt >> >> On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier <pomperma...@okkam.it >> > wrote: >> >>> Thanks a lot Kurt! >>> >>> On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <ykt...@gmail.com> wrote: >>> >>>> Thanks for the test case, i will take a look at it. >>>> >>>> Flavio Pompermaier <pomperma...@okkam.it>于2017年4月27日 周四03:55写道: >>>> >>>>> I've created a repository with a unit test to reproduce the error at >>>>> https://github.com/fpompermaier/flink-batch-bug/blob/mast >>>>> er/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java >>>>> (probably >>>>> this error is related also to FLINK-4719). >>>>> >>>>> The exception is thrown only when there are null strings and multiple >>>>> slots per TM, I don't know whether UnilateralSorterMerger is involved or >>>>> not (but I think so..). >>>>> A quick fix for this problem would be very appreciated because it's >>>>> bloking a production deployment.. >>>>> >>>>> Thanks in advance to all, >>>>> Flavio >>>>> >>>>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> After digging into the code and test I think that the problem is >>>>>> almost certainly in the UnilateralSortMerger, there should be a missing >>>>>> synchronization on some shared object somewhere...Right now I'm trying to >>>>>> understand if this section of code creates some shared object (like >>>>>> queues) >>>>>> that are accessed in a bad way when there's spilling to disk: >>>>>> >>>>>> // start the thread that reads the input channels >>>>>> this.readThread = getReadingThread(exceptionHandler, input, >>>>>> circularQueues, largeRecordHandler, >>>>>> parentTask, serializer, ((long) (startSpillingFraction * >>>>>> sortMemory))); >>>>>> >>>>>> // start the thread that sorts the buffers >>>>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues, >>>>>> parentTask); >>>>>> >>>>>> // start the thread that handles spilling to secondary storage >>>>>> this.spillThread = getSpillingThread(exceptionHandler, >>>>>> circularQueues, parentTask, >>>>>> memoryManager, ioManager, serializerFactory, comparator, >>>>>> this.sortReadMemory, this.writeMemory, >>>>>> maxNumFileHandles); >>>>>> .... >>>>>> startThreads(); >>>>>> >>>>>> >>>>>> The problem is that the unit tests of GroupReduceDriver use Record >>>>>> and testing Rows in not very straightforward and I'm still trying to >>>>>> reproduce the problem in a local env.. >>>>>> >>>>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> Thanks for the explanation . Is there a way to force this behaviour >>>>>>> in a local environment (to try to debug the problem)? >>>>>>> >>>>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Flavio, >>>>>>>> >>>>>>>> these files are used for spilling data to disk. In your case sorted >>>>>>>> runs of records. >>>>>>>> Later all (up to a fanout threshold) these sorted runs are read and >>>>>>>> merged to get a completely sorted record stream. >>>>>>>> >>>>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it >>>>>>>> >: >>>>>>>> >>>>>>>>> The error appears as soon as some taskmanager generates some >>>>>>>>> inputchannel file. >>>>>>>>> What are those files used for? >>>>>>>>> >>>>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>>> In another run of the job I had another Exception. Could it be >>>>>>>>>> helpful? >>>>>>>>>> >>>>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading >>>>>>>>>> Thread' terminated due to an exception: Serializer consumed more >>>>>>>>>> bytes than >>>>>>>>>> the record had. This indicates broken serialization. If you are using >>>>>>>>>> custom serialization types (Value or Writable), check their >>>>>>>>>> serialization >>>>>>>>>> methods. If you are using a Kryo-serialized type, check the >>>>>>>>>> corresponding >>>>>>>>>> Kryo serializer. >>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>>>>>>> ava:465) >>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas >>>>>>>>>> k.java:355) >>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>>>>>>>>> exception: >>>>>>>>>> Serializer consumed more bytes than the record had. This indicates >>>>>>>>>> broken >>>>>>>>>> serialization. If you are using custom serialization types (Value or >>>>>>>>>> Writable), check their serialization methods. If you are using a >>>>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer. >>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>>>>>> ask.java:1094) >>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>>>>>>> (GroupReduceDriver.java:99) >>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>>>>>>> ava:460) >>>>>>>>>> ... 3 more >>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading >>>>>>>>>> Thread' terminated due to an exception: Serializer consumed more >>>>>>>>>> bytes than >>>>>>>>>> the record had. This indicates broken serialization. If you are using >>>>>>>>>> custom serialization types (Value or Writable), check their >>>>>>>>>> serialization >>>>>>>>>> methods. If you are using a Kryo-serialized type, check the >>>>>>>>>> corresponding >>>>>>>>>> Kryo serializer. >>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes >>>>>>>>>> than the record had. This indicates broken serialization. If you are >>>>>>>>>> using >>>>>>>>>> custom serialization types (Value or Writable), check their >>>>>>>>>> serialization >>>>>>>>>> methods. If you are using a Kryo-serialized type, check the >>>>>>>>>> corresponding >>>>>>>>>> Kryo serializer. >>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>>>> daptiveSpanningRecordDeserializer.java:123) >>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>>>>>>> Reader.next(MutableRecordReader.java:42) >>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>>>>>>> ReaderIterator.java:59) >>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>>>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor >>>>>>>>>> ySegment.java:104) >>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read >>>>>>>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226) >>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read >>>>>>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231) >>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja >>>>>>>>>> va:770) >>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>>> deserialize(StringSerializer.java:69) >>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>>> deserialize(StringSerializer.java:74) >>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>>> deserialize(StringSerializer.java:28) >>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>>> serialize(RowSerializer.java:193) >>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>>> serialize(RowSerializer.java:36) >>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>>>> daptiveSpanningRecordDeserializer.java:109) >>>>>>>>>> ... 5 more >>>>>>>>>> >>>>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli < >>>>>>>>>> stefano.bort...@huawei.com> wrote: >>>>>>>>>> >>>>>>>>>>> In fact the old problem was with the KryoSerializer missed >>>>>>>>>>> initialization on the exception that would trigger the spilling on >>>>>>>>>>> disk. >>>>>>>>>>> This would lead to dirty serialization buffer that would eventually >>>>>>>>>>> break >>>>>>>>>>> the program. Till worked on it debugging the source code generating >>>>>>>>>>> the >>>>>>>>>>> error. Perhaps someone could try the same also this time. If Flavio >>>>>>>>>>> can >>>>>>>>>>> make the problem reproducible in a shareable program+data. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Stefano >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *From:* Stephan Ewen [mailto:se...@apache.org] >>>>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM >>>>>>>>>>> *To:* user <user@flink.apache.org> >>>>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> In the past, these errors were most often caused by bugs in the >>>>>>>>>>> serializers, not in the sorter. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> What types are you using at that point? The Stack Trace reveals >>>>>>>>>>> ROW and StringValue, any other involved types? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier < >>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>> >>>>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to >>>>>>>>>>> force spilling to disk) and the job failed almost immediately.. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier < >>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>> >>>>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of >>>>>>>>>>> the entire data (using the id value to filter data with parquet >>>>>>>>>>> push down >>>>>>>>>>> filters) and all slices completed successfully :( >>>>>>>>>>> >>>>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) >>>>>>>>>>> to see if this was somehow a factor of stress but it didn't cause >>>>>>>>>>> any error. >>>>>>>>>>> >>>>>>>>>>> Then I almost doubled the number of rows to process and finally >>>>>>>>>>> the error showed up again. >>>>>>>>>>> >>>>>>>>>>> It seems somehow related to spilling to disk but I can't really >>>>>>>>>>> understand what's going on :( >>>>>>>>>>> >>>>>>>>>>> This is a summary of my debug attempts: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 4 Task managers with 6 GB and 1 slot each, parallelism = 4 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> id < 10.000.000.000 => 1.857.365 rows => OK >>>>>>>>>>> >>>>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows >>>>>>>>>>> => OK >>>>>>>>>>> >>>>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 >>>>>>>>>>> rows => OK >>>>>>>>>>> >>>>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 >>>>>>>>>>> rows => OK >>>>>>>>>>> >>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>>>>>>> >>>>>>>>>>> id >= 99.945.000.000 => 56.825.172 rows => ERROR >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Any help is appreciated.. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> >>>>>>>>>>> Flavio >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier < >>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>> >>>>>>>>>>> I could but only if there's a good probability that it fix the >>>>>>>>>>> problem...how confident are you about it? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Looking at git log of DataInputDeserializer.java , there has >>>>>>>>>>> been some recent change. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error >>>>>>>>>>> is reproducible ? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Cheers >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi to all, >>>>>>>>>>> >>>>>>>>>>> I think I'm again on the weird Exception with the >>>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer... >>>>>>>>>>> >>>>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink >>>>>>>>>>> spills to disk but the Exception thrown is not very helpful. Any >>>>>>>>>>> idea? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the >>>>>>>>>>> sorted input: Thread 'SortMerger Reading Thread' terminated due to >>>>>>>>>>> an >>>>>>>>>>> exception: null >>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>>>>>>> ask.java:1094) >>>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>>>>>>>> (GroupReduceDriver.java:99) >>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>>>>>>>> ava:460) >>>>>>>>>>> ... 3 more >>>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading >>>>>>>>>>> Thread' terminated due to an exception: null >>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>>>>>> Caused by: java.io.EOFException >>>>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi >>>>>>>>>>> gnedByte(DataInputDeserializer.java:306) >>>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja >>>>>>>>>>> va:747) >>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>>>> deserialize(StringSerializer.java:69) >>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>>>> deserialize(StringSerializer.java:74) >>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>>>>>>> deserialize(StringSerializer.java:28) >>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>>>> serialize(RowSerializer.java:193) >>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>>>>> serialize(RowSerializer.java:36) >>>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>>>>>>> daptiveSpanningRecordDeserializer.java:144) >>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>>>>>>>> Reader.next(MutableRecordReader.java:42) >>>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>>>>>>>> ReaderIterator.java:59) >>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> >>>>>>>>>>> Flavio >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>> -- >>>> Best, >>>> Kurt >>>> >>> >>> >> >