The types I read are: [String, String, String, String, String, String, String, String, String, Boolean, Long, Long, Long, Integer, Integer, Long, String, String, Long, Long, String, Long, String, String, String, String, String, String, String, String, String, String, String, String, String, String, String]
On Fri, Apr 21, 2017 at 10:03 AM, Stephan Ewen <se...@apache.org> wrote: > In the past, these errors were most often caused by bugs in the > serializers, not in the sorter. > > What types are you using at that point? The Stack Trace reveals ROW and > StringValue, any other involved types? > > On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force >> spilling to disk) and the job failed almost immediately.. >> >> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >>> I debugged a bit the process repeating the job on a sub-slice of the >>> entire data (using the id value to filter data with parquet push down >>> filters) and all slices completed successfully :( >>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see >>> if this was somehow a factor of stress but it didn't cause any error. >>> Then I almost doubled the number of rows to process and finally the >>> error showed up again. >>> It seems somehow related to spilling to disk but I can't really >>> understand what's going on :( >>> This is a summary of my debug attempts: >>> >>> 4 Task managers with 6 GB and 1 slot each, parallelism = 4 >>> >>> id < 10.000.000.000 => 1.857.365 rows => OK >>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK >>> id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 rows => >>> OK >>> id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 rows => >>> OK >>> id >= 99.960.000.000 => 32.936.422 rows => OK >>> >>> 4 TM with 8 GB and 4 slot each, parallelism 16 >>> >>> id >= 99.960.000.000 => 32.936.422 rows => OK >>> id >= 99.945.000.000 => 56.825.172 rows => ERROR >>> >>> Any help is appreciated.. >>> Best, >>> Flavio >>> >>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> I could but only if there's a good probability that it fix the >>>> problem...how confident are you about it? >>>> >>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> Looking at git log of DataInputDeserializer.java , there has been some >>>>> recent change. >>>>> >>>>> If you have time, maybe try with 1.2.1 RC and see if the error is >>>>> reproducible ? >>>>> >>>>> Cheers >>>>> >>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Hi to all, >>>>>> I think I'm again on the weird Exception with the >>>>>> SpillingAdaptiveSpanningRecordDeserializer... >>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills >>>>>> to disk but the Exception thrown is not very helpful. Any idea? >>>>>> >>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: >>>>>> null >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> .getIterator(UnilateralSortMerger.java:619) >>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>>>> ask.java:1094) >>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>>>> (GroupReduceDriver.java:99) >>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j >>>>>> ava:460) >>>>>> ... 3 more >>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>>>> terminated due to an exception: null >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>>>> Caused by: java.io.EOFException >>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi >>>>>> gnedByte(DataInputDeserializer.java:306) >>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja >>>>>> va:747) >>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>> deserialize(StringSerializer.java:69) >>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>> deserialize(StringSerializer.java:74) >>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>>>> deserialize(StringSerializer.java:28) >>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>> serialize(RowSerializer.java:193) >>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>>>> serialize(RowSerializer.java:36) >>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>>>> gate.read(ReusingDeserializationDelegate.java:57) >>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>>>> daptiveSpanningRecordDeserializer.java:144) >>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>>>> Reader.next(MutableRecordReader.java:42) >>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>>>> ReaderIterator.java:59) >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>>>> >>>>>> >>>>>> Best, >>>>>> Flavio >>>>>> >>>>> >>>>> >>>> >>>> >>> >> >> >