Thanks a lot Kurt! On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <ykt...@gmail.com> wrote:
> Thanks for the test case, i will take a look at it. > > Flavio Pompermaier <pomperma...@okkam.it>于2017年4月27日 周四03:55写道: > >> I've created a repository with a unit test to reproduce the error at >> https://github.com/fpompermaier/flink-batch-bug/ >> blob/master/src/test/java/it/okkam/flink/aci/ >> TestDataInputDeserializer.java (probably this error is related also to >> FLINK-4719). >> >> The exception is thrown only when there are null strings and multiple >> slots per TM, I don't know whether UnilateralSorterMerger is involved or >> not (but I think so..). >> A quick fix for this problem would be very appreciated because it's >> bloking a production deployment.. >> >> Thanks in advance to all, >> Flavio >> >> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <pomperma...@okkam.it >> > wrote: >> >>> After digging into the code and test I think that the problem is almost >>> certainly in the UnilateralSortMerger, there should be a missing >>> synchronization on some shared object somewhere...Right now I'm trying to >>> understand if this section of code creates some shared object (like queues) >>> that are accessed in a bad way when there's spilling to disk: >>> >>> // start the thread that reads the input channels >>> this.readThread = getReadingThread(exceptionHandler, input, >>> circularQueues, largeRecordHandler, >>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory))); >>> >>> // start the thread that sorts the buffers >>> this.sortThread = getSortingThread(exceptionHandler, circularQueues, >>> parentTask); >>> >>> // start the thread that handles spilling to secondary storage >>> this.spillThread = getSpillingThread(exceptionHandler, circularQueues, >>> parentTask, >>> memoryManager, ioManager, serializerFactory, comparator, >>> this.sortReadMemory, this.writeMemory, >>> maxNumFileHandles); >>> .... >>> startThreads(); >>> >>> >>> The problem is that the unit tests of GroupReduceDriver use Record and >>> testing Rows in not very straightforward and I'm still trying to reproduce >>> the problem in a local env.. >>> >>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> Thanks for the explanation . Is there a way to force this behaviour in >>>> a local environment (to try to debug the problem)? >>>> >>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fhue...@gmail.com> wrote: >>>> >>>>> Hi Flavio, >>>>> >>>>> these files are used for spilling data to disk. In your case sorted >>>>> runs of records. >>>>> Later all (up to a fanout threshold) these sorted runs are read and >>>>> merged to get a completely sorted record stream. >>>>> >>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >>>>> >>>>>> The error appears as soon as some taskmanager generates some >>>>>> inputchannel file. >>>>>> What are those files used for? >>>>>> >>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> In another run of the job I had another Exception. Could it be >>>>>>> helpful? >>>>>>> >>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread' >>>>>>> terminated due to an exception: Serializer consumed more bytes than the >>>>>>> record had. This indicates broken serialization. If you are using custom >>>>>>> serialization types (Value or Writable), check their serialization >>>>>>> methods. >>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo >>>>>>> serializer. >>>>>>> at org.apache.flink.runtime.operators.BatchTask.run( >>>>>>> BatchTask.java:465) >>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke( >>>>>>> BatchTask.java:355) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>>>>>> exception: >>>>>>> Serializer consumed more bytes than the record had. This indicates >>>>>>> broken >>>>>>> serialization. If you are using custom serialization types (Value or >>>>>>> Writable), check their serialization methods. If you are using a >>>>>>> Kryo-serialized type, check the corresponding Kryo serializer. >>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger. >>>>>>> getIterator(UnilateralSortMerger.java:619) >>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput( >>>>>>> BatchTask.java:1094) >>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver. >>>>>>> prepare(GroupReduceDriver.java:99) >>>>>>> at org.apache.flink.runtime.operators.BatchTask.run( >>>>>>> BatchTask.java:460) >>>>>>> ... 3 more >>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>>> terminated due to an exception: Serializer consumed more bytes than the >>>>>>> record had. This indicates broken serialization. If you are using custom >>>>>>> serialization types (Value or Writable), check their serialization >>>>>>> methods. >>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo >>>>>>> serializer. >>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ >>>>>>> ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes than >>>>>>> the record had. This indicates broken serialization. If you are using >>>>>>> custom serialization types (Value or Writable), check their >>>>>>> serialization >>>>>>> methods. If you are using a Kryo-serialized type, check the >>>>>>> corresponding >>>>>>> Kryo serializer. >>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:123) >>>>>>> at org.apache.flink.runtime.io.network.api.reader. >>>>>>> AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) >>>>>>> at org.apache.flink.runtime.io.network.api.reader. >>>>>>> MutableRecordReader.next(MutableRecordReader.java:42) >>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator. >>>>>>> next(ReaderIterator.java:59) >>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ >>>>>>> ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ >>>>>>> ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768 >>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get( >>>>>>> HeapMemorySegment.java:104) >>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>> SpillingAdaptiveSpanningRecordDeserializer$ >>>>>>> NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecord >>>>>>> Deserializer.java:226) >>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>> SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper. >>>>>>> readUnsignedByte(SpillingAdaptiveSpanningRecord >>>>>>> Deserializer.java:231) >>>>>>> at org.apache.flink.types.StringValue.readString( >>>>>>> StringValue.java:770) >>>>>>> at org.apache.flink.api.common.typeutils.base. >>>>>>> StringSerializer.deserialize(StringSerializer.java:69) >>>>>>> at org.apache.flink.api.common.typeutils.base. >>>>>>> StringSerializer.deserialize(StringSerializer.java:74) >>>>>>> at org.apache.flink.api.common.typeutils.base. >>>>>>> StringSerializer.deserialize(StringSerializer.java:28) >>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>> serialize(RowSerializer.java:193) >>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>> serialize(RowSerializer.java:36) >>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate >>>>>>> .read(ReusingDeserializationDelegate.java:57) >>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:109) >>>>>>> ... 5 more >>>>>>> >>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli < >>>>>>> stefano.bort...@huawei.com> wrote: >>>>>>> >>>>>>>> In fact the old problem was with the KryoSerializer missed >>>>>>>> initialization on the exception that would trigger the spilling on >>>>>>>> disk. >>>>>>>> This would lead to dirty serialization buffer that would eventually >>>>>>>> break >>>>>>>> the program. Till worked on it debugging the source code generating the >>>>>>>> error. Perhaps someone could try the same also this time. If Flavio can >>>>>>>> make the problem reproducible in a shareable program+data. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Stefano >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *From:* Stephan Ewen [mailto:se...@apache.org] >>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM >>>>>>>> *To:* user <user@flink.apache.org> >>>>>>>> *Subject:* Re: UnilateralSortMerger error (again) >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> In the past, these errors were most often caused by bugs in the >>>>>>>> serializers, not in the sorter. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> What types are you using at that point? The Stack Trace reveals ROW >>>>>>>> and StringValue, any other involved types? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to >>>>>>>> force spilling to disk) and the job failed almost immediately.. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>> I debugged a bit the process repeating the job on a sub-slice of >>>>>>>> the entire data (using the id value to filter data with parquet push >>>>>>>> down >>>>>>>> filters) and all slices completed successfully :( >>>>>>>> >>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to >>>>>>>> see if this was somehow a factor of stress but it didn't cause any >>>>>>>> error. >>>>>>>> >>>>>>>> Then I almost doubled the number of rows to process and finally the >>>>>>>> error showed up again. >>>>>>>> >>>>>>>> It seems somehow related to spilling to disk but I can't really >>>>>>>> understand what's going on :( >>>>>>>> >>>>>>>> This is a summary of my debug attempts: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 4 Task managers with 6 GB and 1 slot each, parallelism = 4 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> id < 10.000.000.000 => 1.857.365 rows => OK >>>>>>>> >>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => >>>>>>>> OK >>>>>>>> >>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 >>>>>>>> rows => OK >>>>>>>> >>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 >>>>>>>> rows => OK >>>>>>>> >>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16 >>>>>>>> >>>>>>>> >>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK >>>>>>>> >>>>>>>> id >= 99.945.000.000 => 56.825.172 rows => ERROR >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Any help is appreciated.. >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Flavio >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>> I could but only if there's a good probability that it fix the >>>>>>>> problem...how confident are you about it? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Looking at git log of DataInputDeserializer.java , there has been >>>>>>>> some recent change. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is >>>>>>>> reproducible ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Cheers >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>> Hi to all, >>>>>>>> >>>>>>>> I think I'm again on the weird Exception with the >>>>>>>> SpillingAdaptiveSpanningRecordDeserializer... >>>>>>>> >>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills >>>>>>>> to disk but the Exception thrown is not very helpful. Any idea? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an >>>>>>>> exception: >>>>>>>> null >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger. >>>>>>>> getIterator(UnilateralSortMerger.java:619) >>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput( >>>>>>>> BatchTask.java:1094) >>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver. >>>>>>>> prepare(GroupReduceDriver.java:99) >>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run( >>>>>>>> BatchTask.java:460) >>>>>>>> ... 3 more >>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>>>> terminated due to an exception: null >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ >>>>>>>> ThreadBase.run(UnilateralSortMerger.java:799) >>>>>>>> Caused by: java.io.EOFException >>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer. >>>>>>>> readUnsignedByte(DataInputDeserializer.java:306) >>>>>>>> at org.apache.flink.types.StringValue.readString( >>>>>>>> StringValue.java:747) >>>>>>>> at org.apache.flink.api.common.typeutils.base. >>>>>>>> StringSerializer.deserialize(StringSerializer.java:69) >>>>>>>> at org.apache.flink.api.common.typeutils.base. >>>>>>>> StringSerializer.deserialize(StringSerializer.java:74) >>>>>>>> at org.apache.flink.api.common.typeutils.base. >>>>>>>> StringSerializer.deserialize(StringSerializer.java:28) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>> serialize(RowSerializer.java:193) >>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>>>> serialize(RowSerializer.java:36) >>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate >>>>>>>> .read(ReusingDeserializationDelegate.java:57) >>>>>>>> at org.apache.flink.runtime.io.network.api.serialization. >>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( >>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:144) >>>>>>>> at org.apache.flink.runtime.io.network.api.reader. >>>>>>>> AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) >>>>>>>> at org.apache.flink.runtime.io.network.api.reader. >>>>>>>> MutableRecordReader.next(MutableRecordReader.java:42) >>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator. >>>>>>>> next(ReaderIterator.java:59) >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ >>>>>>>> ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ >>>>>>>> ThreadBase.run(UnilateralSortMerger.java:796) >>>>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Flavio >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>> >>> >> -- > Best, > Kurt >