Hallo everyone,


I have a Flink batch job, which reads four CSV files. The rows in the files=  
will be read and grouped together.



When the four CSV Files are small enough, the job can finish successfully. = 
However when the input files are large, the job could not successfully exec= 
uted and the following exception as shown below.



Could somebody please help me to fix this problem?





Best regards,

Ferry







2017-03-29 17:39:19,396 DEBUG 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter   - Spilling sort 
buffer without large record handling.

2017-03-29 17:39:19,458 ERROR org.apache.flink.runtime.operators.BatchTask      
            - Error in task code:  CHAIN GroupReduce (GroupReduce at 
readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) 
(15/16)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
(GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map 
(Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger spilling thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger spilling thread' terminated due to an exception: null

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)

        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated 
due to an exception: null

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

Caused by: java.io.EOFException

        at 
org.apache.flink.runtime.io.disk.RandomAccessInputView.nextSegment(RandomAccessInputView.java:79)

        at 
org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)

        at 
org.apache.flink.runtime.memory.AbstractPagedInputView.readLong(AbstractPagedInputView.java:357)

        at 
org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:287)

        at 
org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:406)

        at 
org.apache.flink.api.common.typeutils.base.BigIntSerializer.copyBigInteger(BigIntSerializer.java:141)

        at 
org.apache.flink.api.common.typeutils.base.BigDecSerializer.copy(BigDecSerializer.java:104)

        at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:212)

        at 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

2017-03-29 17:39:19,759 DEBUG org.apache.flink.runtime.operators.BatchTask      
            - Releasing all broadcast variables.:  CHAIN GroupReduce 
(GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map 
(Key Extractor) (15/16)

2017-03-29 17:39:19,660 DEBUG 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Spilling buffer 
0.

2017-03-29 17:39:19,806 DEBUG 
org.apache.flink.runtime.operators.sort.NormalizedKeySorter   - Spilling sort 
buffer without large record handling.

2017-03-29 17:39:19,641 DEBUG 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Closing of 
sort/merger was interrupted. The reading/sorting/spilling threads may still be 
working.

java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1249)

        at java.lang.Thread.join(Thread.java:1323)

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.close(UnilateralSortMerger.java:480)

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:367)

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:362)

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.internalHandleException(UnilateralSortMerger.java:842)

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

2017-03-29 17:39:19,641 ERROR org.apache.flink.runtime.operators.BatchTask      
            - Error in task code:  CHAIN GroupReduce (GroupReduce at 
readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) 
(1/16)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
(GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map 
(Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 
'SortMerger Reading Thread' terminated due to an exception: null

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at 
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)

        at 
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: null

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

Caused by: java.io.EOFException

        at 
org.apache.flink.runtime.util.DataInputDeserializer.readInt(DataInputDeserializer.java:179)

        at 
org.apache.flink.api.common.typeutils.base.BigDecSerializer.readBigDecimal(BigDecSerializer.java:125)

        at 
org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:99)

        at 
org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:31)

        at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193)

        at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36)

        at 
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)

        at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)

        at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)

        at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)

        at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)

        at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

2017-03-29 17:39:19,460 DEBUG 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Emitting final 
buffer from reader thread: 1.

Reply via email to