[ 
https://issues.apache.org/jira/browse/FLINK-9812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617593#comment-16617593
 ] 

Nico Kruber commented on FLINK-9812:
------------------------------------

Nice find [~yanghua].
Actually, setting {{spillingChannel = null}} here will just lead to a different 
error: after looking it once more, this loop is wrong use of the 
{{SpillingAdaptiveSpanningRecordDeserializer}}:
{code}
                for (SerializationTestType record : records) {

                        serializedRecords.add(record);

                        numRecords++;

                        // serialize record
                        if (serializer.addRecord(record).isFullBuffer()) {
                                // buffer is full => start deserializing
                                
deserializer.setNextBuffer(serializationResult.buildBuffer());

                                while (!serializedRecords.isEmpty()) {
                                        SerializationTestType expected = 
serializedRecords.poll();
                                        SerializationTestType actual = 
expected.getClass().newInstance();

                                        if 
(deserializer.getNextRecord(actual).isFullRecord()) {
                                                Assert.assertEquals(expected, 
actual);
                                                numRecords--;
                                        } else {
                                                
serializedRecords.addFirst(expected);
                                                break;
                                        }
                                }

                                // move buffers as long as necessary (for long 
records)
                                while ((serializationResult = 
setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) {
                                        
deserializer.setNextBuffer(serializationResult.buildBuffer());
                                        serializer.clear();
                                }
                        }
                }
{code}

After calling {{deserializer.setNextBuffer()}}, it should be drained from all 
the stored records before adding any buffer again.
- while spilling a spanning record, only {{getNextRecord()}} actually calls 
{{SpanningWrapper#moveRemainderToNonSpanningDeserializer()}} and adding more 
buffers may silently corrupt existing data (from {{segmentRemaining}})
- same without spilling
- without spanning, we don't get into this situation for this test

I'll prepare a fix for that

> SpanningRecordSerializationTest fails on travis
> -----------------------------------------------
>
>                 Key: FLINK-9812
>                 URL: https://issues.apache.org/jira/browse/FLINK-9812
>             Project: Flink
>          Issue Type: Bug
>          Components: Network, Tests, Type Serialization System
>    Affects Versions: 1.6.0
>            Reporter: Chesnay Schepler
>            Priority: Critical
>             Fix For: 1.7.0, 1.6.2
>
>
> https://travis-ci.org/zentol/flink/jobs/402744191
> {code}
> testHandleMixedLargeRecords(org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest)
>   Time elapsed: 6.113 sec  <<< ERROR!
> java.nio.channels.ClosedChannelException: null
>       at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
>       at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:199)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$SpanningWrapper.addNextChunkFromMemorySegment(SpillingAdaptiveSpanningRecordDeserializer.java:529)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$SpanningWrapper.access$200(SpillingAdaptiveSpanningRecordDeserializer.java:431)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.setNextBuffer(SpillingAdaptiveSpanningRecordDeserializer.java:76)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testSerializationRoundTrip(SpanningRecordSerializationTest.java:149)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testSerializationRoundTrip(SpanningRecordSerializationTest.java:115)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializationTest.testHandleMixedLargeRecords(SpanningRecordSerializationTest.java:104)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to