As suggested by Fabian I set taskmanager.memory.size = 1024 (to force spilling to disk) and the job failed almost immediately..
On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > I debugged a bit the process repeating the job on a sub-slice of the > entire data (using the id value to filter data with parquet push down > filters) and all slices completed successfully :( > So I tried to increase the parallelism (from 1 slot per TM to 4) to see if > this was somehow a factor of stress but it didn't cause any error. > Then I almost doubled the number of rows to process and finally the error > showed up again. > It seems somehow related to spilling to disk but I can't really understand > what's going on :( > This is a summary of my debug attempts: > > 4 Task managers with 6 GB and 1 slot each, parallelism = 4 > > id < 10.000.000.000 => 1.857.365 rows => OK > id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK > id >= 10.010.000.000 && id < 99.945.000.000 => 20.926.903 rows => OK > id >= 99.945.000.000 && id < 99.960.000.000 => 23.888.750 rows => > OK > id >= 99.960.000.000 => 32.936.422 rows => OK > > 4 TM with 8 GB and 4 slot each, parallelism 16 > > id >= 99.960.000.000 => 32.936.422 rows => OK > id >= 99.945.000.000 => 56.825.172 rows => ERROR > > Any help is appreciated.. > Best, > Flavio > > On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> I could but only if there's a good probability that it fix the >> problem...how confident are you about it? >> >> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Looking at git log of DataInputDeserializer.java , there has been some >>> recent change. >>> >>> If you have time, maybe try with 1.2.1 RC and see if the error is >>> reproducible ? >>> >>> Cheers >>> >>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier < >>> pomperma...@okkam.it> wrote: >>> >>>> Hi to all, >>>> I think I'm again on the weird Exception with the >>>> SpillingAdaptiveSpanningRecordDeserializer... >>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to >>>> disk but the Exception thrown is not very helpful. Any idea? >>>> >>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted >>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception: >>>> null >>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>> .getIterator(UnilateralSortMerger.java:619) >>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT >>>> ask.java:1094) >>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare >>>> (GroupReduceDriver.java:99) >>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) >>>> ... 3 more >>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' >>>> terminated due to an exception: null >>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>> $ThreadBase.run(UnilateralSortMerger.java:799) >>>> Caused by: java.io.EOFException >>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi >>>> gnedByte(DataInputDeserializer.java:306) >>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747) >>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>> deserialize(StringSerializer.java:69) >>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>> deserialize(StringSerializer.java:74) >>>> at org.apache.flink.api.common.typeutils.base.StringSerializer. >>>> deserialize(StringSerializer.java:28) >>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>> serialize(RowSerializer.java:193) >>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de >>>> serialize(RowSerializer.java:36) >>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele >>>> gate.read(ReusingDeserializationDelegate.java:57) >>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli >>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA >>>> daptiveSpanningRecordDeserializer.java:144) >>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor >>>> dReader.getNextRecord(AbstractRecordReader.java:72) >>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord >>>> Reader.next(MutableRecordReader.java:42) >>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next( >>>> ReaderIterator.java:59) >>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>> $ReadingThread.go(UnilateralSortMerger.java:1035) >>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger >>>> $ThreadBase.run(UnilateralSortMerger.java:796) >>>> >>>> >>>> Best, >>>> Flavio >>>> >>> >>> >> >> >