Thanks for your feedback. I am neither running IPSec nor the aesni-intel module.

So far, I could not reproduce the reordering issue. I also have detected that 
my code might have created String objects with invalid UTF16 content in exactly 
those jobs that suffered from the reordering. I wanted to use binary data as a 
key, so I put it into a char array and wrapped it in a String, hoping that 
would do the trick. Maybe, this has caused some error in the String 
(de-)serialization with further consequences, e.g, skipping of a buffer. Ever 
since I have encoded the binary data in Base64, the reordering error did not 
pop up again. I will do some more runs to verify this.

However, the other problem with the serialization exceptions should not be 
affected of this. To see if network stream corruption is an issue, I also 
perform some runs now on a larger single machine with a single task manager, so 
that no network communication is involved. Serialization should take place 
anyway, right? Is there a way, to run a second task manager on the same machine 
within the same OS?

Cheers,
Sebastian

-----Original Message-----
From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: Mittwoch, 3. Juni 2015 20:14
To: dev@flink.apache.org
Subject: Re: Buffer re-ordering problem

Hi Sebastian!

The first error that you report looks could also be a lost buffer, rather than 
a reordering.
The second error that you post seems a serialization issue.
Both can be consequences of a network stream corruption.

Would be good to figure out why the netty event loop does not encode/decode 
this properly in your case.

BTW: I would assume we can virtually rule out TCP/kernel bugs as sources for 
the corruption, unless you have this setup here:
http://arstechnica.com/information-technology/2015/05/the-discovery-of-apache-zookeepers-poison-packet/

Stephan


On Wed, Jun 3, 2015 at 7:51 PM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Sebastian,
>
> would you mind sharing the code and dataset with me (privately would 
> work fine). I want to try to reproduce this as well.
>
> – Ufuk
>
> On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian 
> <sebastian.kr...@hpi.de>
> wrote:
>
> > I am currently using 0.9-SNAPSHOT. All the non-jar files are from an
> older
> > build, but I recently manually updated the flink-dist.jar with 
> > commit d163a817fa2e330e86384d0bbcd104f051a6fb48.
> >
> > Our setup consists of 10 workers and a master, all interconnected 
> > via a switch (100 Mbit, I think). The data set is an NTriple file of 
> > about 8
> GB,
> > however, intermediate datasets might be much larger. However, for 
> > smaller datasets, I could not observe this problem, yet.
> >
> > Also, during the failure there are a lot of concurrent shuffles 
> > ongoing [1,2]. Additionally, it might be interesting that in the 
> > affected jobs either this exception occurs or another one that looks 
> > like a network disruption. [3] So, it might well be, that our setup 
> > suffers from occasional network errors, especially during high 
> > network load - but
> that’s
> > just a plain guess.
> >
> > Regarding the reproducibility, I can only say right now, that this 
> > error occurred twice. I will re-run the jobs and see, if the errors 
> > can be observed again and let you know.
> >
> > Cheers,
> > Sebastian
> >
> > [1]
> >
> https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331
> f5099cad64704
> > [2]
> >
> https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf
> 598309d57b7b1
> > [3] Stack trace: tenem18.hpi.uni-potsdam.de
> > Error: java.lang.Exception: The data preparation for task 'CHAIN 
> > GroupReduce (GroupReduce at
> >
> de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl
> an(AllAtOnceTraversalStrategy.scala:56))
> > -> Filter (Filter at
> >
> de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl
> an(AllAtOnceTraversalStrategy.scala:68))
> > -> FlatMap (FlatMap at
> >
> de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver
> salStrategy.scala:46))
> > -> Map (Map at
> >
> de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))'
> > , caused an error: Error obtaining the sorted input: Thread 
> > 'SortMerger Reading Thread' terminated due to an exception: null at
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask
> .java:471)
> > at
> >
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT
> ask.java:362)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> > Thread 'SortMerger Reading Thread' terminated due to an exception: 
> > null at
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat
> or(UnilateralSortMerger.java:607)
> > at
> >
> org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac
> tTask.java:1145)
> > at
> >
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu
> ceDriver.java:94)
> > at
> >
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask
> .java:466)
> > ... 3 more
> > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> > terminated due to an exception: null at
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas
> e.run(UnilateralSortMerger.java:784)
> > Caused by: java.io.EOFException
> > at
> >
> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(D
> ataInputDeserializer.java:290)
> > at 
> > org.apache.flink.types.StringValue.readString(StringValue.java:741)
> > at
> >
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ
> e(StringSerializer.java:68)
> > at
> >
> org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ
> e(StringSerializer.java:28)
> > at
> >
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C
> aseClassSerializer.scala:102)
> > at
> >
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C
> aseClassSerializer.scala:29)
> > at
> >
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.dese
> rialize(GenericArraySerializer.java:123)
> > at
> >
> org.apache.flink.api.common.typeutils.base.GenericArraySerializer.dese
> rialize(GenericArraySerializer.java:33)
> > at
> >
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C
> aseClassSerializer.scala:102)
> > at
> >
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C
> aseClassSerializer.scala:95)
> > at
> >
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C
> aseClassSerializer.scala:29)
> > at
> >
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(
> ReusingDeserializationDelegate.java:57)
> > at
> >
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive
> SpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecor
> dDeserializer.java:133)
> > at
> >
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.ge
> tNextRecord(AbstractRecordReader.java:64)
> > at
> >
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.nex
> t(MutableRecordReader.java:34)
> > at
> >
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIter
> ator.java:59)
> > at
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingTh
> read.go(UnilateralSortMerger.java:1020)
> > at
> >
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas
> e.run(UnilateralSortMerger.java:781)
> >
> >
> > -----Original Message-----
> > From: Ufuk Celebi [mailto:u...@apache.org]
> > Sent: Mittwoch, 3. Juni 2015 10:33
> > To: dev@flink.apache.org
> > Subject: Re: Buffer re-ordering problem
> >
> > This is a critical bug.
> >
> > - which version are you using? If snapshot, which commit?
> > - what is your setup? Number of machines, datset etc?
> > - is it reproducible?
> >
> > On Wednesday, June 3, 2015, Kruse, Sebastian 
> > <sebastian.kr...@hpi.de>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I had some jobs running over the night and in two of them after 
> > > about half an hour the following exception occurred. Do you know 
> > > why this
> > happens?
> > >
> > > Thanks,
> > > Sebastian
> > >
> > > tenem16.hpi.uni-potsdam.de
> > > Error: java.lang.Exception: The data preparation for task 'CHAIN 
> > > GroupReduce (GroupReduce at 
> > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFli
> > > nkPl
> > > an(AllAtOnceTraversalStrategy.scala:56))
> > > -> Filter (Filter at
> > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFli
> > > nkPl
> > > an(AllAtOnceTraversalStrategy.scala:68))
> > > -> FlatMap (FlatMap at
> > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Tr
> > > aver
> > > salStrategy.scala:46))
> > > -> Map (Map at
> > >
> >
> de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))'
> > > , caused an error: Error obtaining the sorted input: Thread 
> > > 'SortMerger Reading Thread' terminated due to an exception: Buffer
> > re-ordering:
> > > expected buffer with sequence number 17841, but received 17842.
> > > at
> > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPact
> > > Task
> > > .java:471)
> > > at
> > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularP
> > > actT
> > > ask.java:362) at
> > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > at java.lang.Thread.run(Thread.java:745)
> > > Caused by: java.lang.RuntimeException: Error obtaining the sorted
> input:
> > > Thread 'SortMerger Reading Thread' terminated due to an exception:
> > > Buffer
> > > re-ordering: expected buffer with sequence number 17841, but 
> > > received
> > 17842.
> > > at
> > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIt
> > > erat
> > > or(UnilateralSortMerger.java:607)
> > > at
> > > org.apache.flink.runtime.operators.RegularPactTask.getInput(Regula
> > > rPac
> > > tTask.java:1145)
> > > at
> > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(Group
> > > Redu
> > > ceDriver.java:94)
> > > at
> > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPact
> > > Task
> > > .java:466)
> > > ... 3 more
> > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> > > terminated due to an exception: Buffer re-ordering: expected 
> > > buffer with sequence number 17841, but received 17842.
> > > at
> > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Threa
> > > dBas
> > > e.run(UnilateralSortMerger.java:784)
> > > Caused by:
> > >
> >
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException:
> > > Buffer re-ordering: expected buffer with sequence number 17841, 
> > > but received 17842.
> > > at
> > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInput
> > > Chan
> > > nel.onBuffer(RemoteInputChannel.java:253)
> > > at
> > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa
> > > ndle
> > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279)
> > > at
> > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa
> > > ndle
> > > r.decodeMsg(PartitionRequestClientHandler.java:214)
> > > at
> > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa
> > > ndle
> > > r.channelRead(PartitionRequestClientHandler.java:158)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A
> > > bstr
> > > actChannelHandlerContext.java:339)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs
> > > trac
> > > tChannelHandlerContext.java:324)
> > > at
> > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(Message
> > > ToMe
> > > ssageDecoder.java:103)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A
> > > bstr
> > > actChannelHandlerContext.java:339)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs
> > > trac
> > > tChannelHandlerContext.java:324)
> > > at
> > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMess
> > > ageD
> > > ecoder.java:242)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A
> > > bstr
> > > actChannelHandlerContext.java:339)
> > > at
> > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs
> > > trac
> > > tChannelHandlerContext.java:324)
> > > at
> > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultCha
> > > nnel
> > > Pipeline.java:847)
> > > at
> > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abs
> > > trac
> > > tNioByteChannel.java:131)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.
> > > java
> > > :511)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(Nio
> > > Even
> > > tLoop.java:468)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop
> > > .jav
> > > a:382) at 
> > > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> > > at
> > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThr
> > > eadE
> > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > ---
> > > Sebastian Kruse
> > > Doktorand am Fachbereich Information Systems Group 
> > > Hasso-Plattner-Institut an der Universität Potsdam 
> > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 
> > > Amtsgericht Potsdam, HRB 12184
> > > Geschäftsführung: Prof. Dr. Christoph Meinel
> > >
> > >
> > >
> >
>

Reply via email to