Thanks for your feedback. I am neither running IPSec nor the aesni-intel module.
So far, I could not reproduce the reordering issue. I also have detected that my code might have created String objects with invalid UTF16 content in exactly those jobs that suffered from the reordering. I wanted to use binary data as a key, so I put it into a char array and wrapped it in a String, hoping that would do the trick. Maybe, this has caused some error in the String (de-)serialization with further consequences, e.g, skipping of a buffer. Ever since I have encoded the binary data in Base64, the reordering error did not pop up again. I will do some more runs to verify this. However, the other problem with the serialization exceptions should not be affected of this. To see if network stream corruption is an issue, I also perform some runs now on a larger single machine with a single task manager, so that no network communication is involved. Serialization should take place anyway, right? Is there a way, to run a second task manager on the same machine within the same OS? Cheers, Sebastian -----Original Message----- From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan Ewen Sent: Mittwoch, 3. Juni 2015 20:14 To: dev@flink.apache.org Subject: Re: Buffer re-ordering problem Hi Sebastian! The first error that you report looks could also be a lost buffer, rather than a reordering. The second error that you post seems a serialization issue. Both can be consequences of a network stream corruption. Would be good to figure out why the netty event loop does not encode/decode this properly in your case. BTW: I would assume we can virtually rule out TCP/kernel bugs as sources for the corruption, unless you have this setup here: http://arstechnica.com/information-technology/2015/05/the-discovery-of-apache-zookeepers-poison-packet/ Stephan On Wed, Jun 3, 2015 at 7:51 PM, Ufuk Celebi <u...@apache.org> wrote: > Hey Sebastian, > > would you mind sharing the code and dataset with me (privately would > work fine). I want to try to reproduce this as well. > > – Ufuk > > On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian > <sebastian.kr...@hpi.de> > wrote: > > > I am currently using 0.9-SNAPSHOT. All the non-jar files are from an > older > > build, but I recently manually updated the flink-dist.jar with > > commit d163a817fa2e330e86384d0bbcd104f051a6fb48. > > > > Our setup consists of 10 workers and a master, all interconnected > > via a switch (100 Mbit, I think). The data set is an NTriple file of > > about 8 > GB, > > however, intermediate datasets might be much larger. However, for > > smaller datasets, I could not observe this problem, yet. > > > > Also, during the failure there are a lot of concurrent shuffles > > ongoing [1,2]. Additionally, it might be interesting that in the > > affected jobs either this exception occurs or another one that looks > > like a network disruption. [3] So, it might well be, that our setup > > suffers from occasional network errors, especially during high > > network load - but > that’s > > just a plain guess. > > > > Regarding the reproducibility, I can only say right now, that this > > error occurred twice. I will re-run the jobs and see, if the errors > > can be observed again and let you know. > > > > Cheers, > > Sebastian > > > > [1] > > > https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331 > f5099cad64704 > > [2] > > > https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf > 598309d57b7b1 > > [3] Stack trace: tenem18.hpi.uni-potsdam.de > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > GroupReduce (GroupReduce at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > an(AllAtOnceTraversalStrategy.scala:56)) > > -> Filter (Filter at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > an(AllAtOnceTraversalStrategy.scala:68)) > > -> FlatMap (FlatMap at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver > salStrategy.scala:46)) > > -> Map (Map at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > , caused an error: Error obtaining the sorted input: Thread > > 'SortMerger Reading Thread' terminated due to an exception: null at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > .java:471) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT > ask.java:362) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > null at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat > or(UnilateralSortMerger.java:607) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac > tTask.java:1145) > > at > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu > ceDriver.java:94) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > .java:466) > > ... 3 more > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > terminated due to an exception: null at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > e.run(UnilateralSortMerger.java:784) > > Caused by: java.io.EOFException > > at > > > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(D > ataInputDeserializer.java:290) > > at > > org.apache.flink.types.StringValue.readString(StringValue.java:741) > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ > e(StringSerializer.java:68) > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ > e(StringSerializer.java:28) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > aseClassSerializer.scala:102) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > aseClassSerializer.scala:29) > > at > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.dese > rialize(GenericArraySerializer.java:123) > > at > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.dese > rialize(GenericArraySerializer.java:33) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > aseClassSerializer.scala:102) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > aseClassSerializer.scala:95) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > aseClassSerializer.scala:29) > > at > > > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read( > ReusingDeserializationDelegate.java:57) > > at > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive > SpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecor > dDeserializer.java:133) > > at > > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.ge > tNextRecord(AbstractRecordReader.java:64) > > at > > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.nex > t(MutableRecordReader.java:34) > > at > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIter > ator.java:59) > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingTh > read.go(UnilateralSortMerger.java:1020) > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > e.run(UnilateralSortMerger.java:781) > > > > > > -----Original Message----- > > From: Ufuk Celebi [mailto:u...@apache.org] > > Sent: Mittwoch, 3. Juni 2015 10:33 > > To: dev@flink.apache.org > > Subject: Re: Buffer re-ordering problem > > > > This is a critical bug. > > > > - which version are you using? If snapshot, which commit? > > - what is your setup? Number of machines, datset etc? > > - is it reproducible? > > > > On Wednesday, June 3, 2015, Kruse, Sebastian > > <sebastian.kr...@hpi.de> > > wrote: > > > > > Hi everyone, > > > > > > I had some jobs running over the night and in two of them after > > > about half an hour the following exception occurred. Do you know > > > why this > > happens? > > > > > > Thanks, > > > Sebastian > > > > > > tenem16.hpi.uni-potsdam.de > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > GroupReduce (GroupReduce at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFli > > > nkPl > > > an(AllAtOnceTraversalStrategy.scala:56)) > > > -> Filter (Filter at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFli > > > nkPl > > > an(AllAtOnceTraversalStrategy.scala:68)) > > > -> FlatMap (FlatMap at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Tr > > > aver > > > salStrategy.scala:46)) > > > -> Map (Map at > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > , caused an error: Error obtaining the sorted input: Thread > > > 'SortMerger Reading Thread' terminated due to an exception: Buffer > > re-ordering: > > > expected buffer with sequence number 17841, but received 17842. > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPact > > > Task > > > .java:471) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularP > > > actT > > > ask.java:362) at > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > at java.lang.Thread.run(Thread.java:745) > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > input: > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > Buffer > > > re-ordering: expected buffer with sequence number 17841, but > > > received > > 17842. > > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIt > > > erat > > > or(UnilateralSortMerger.java:607) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(Regula > > > rPac > > > tTask.java:1145) > > > at > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(Group > > > Redu > > > ceDriver.java:94) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPact > > > Task > > > .java:466) > > > ... 3 more > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > terminated due to an exception: Buffer re-ordering: expected > > > buffer with sequence number 17841, but received 17842. > > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Threa > > > dBas > > > e.run(UnilateralSortMerger.java:784) > > > Caused by: > > > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > > > Buffer re-ordering: expected buffer with sequence number 17841, > > > but received 17842. > > > at > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInput > > > Chan > > > nel.onBuffer(RemoteInputChannel.java:253) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > ndle > > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > ndle > > > r.decodeMsg(PartitionRequestClientHandler.java:214) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > ndle > > > r.channelRead(PartitionRequestClientHandler.java:158) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > bstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > trac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(Message > > > ToMe > > > ssageDecoder.java:103) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > bstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > trac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMess > > > ageD > > > ecoder.java:242) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > bstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > trac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultCha > > > nnel > > > Pipeline.java:847) > > > at > > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abs > > > trac > > > tNioByteChannel.java:131) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop. > > > java > > > :511) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(Nio > > > Even > > > tLoop.java:468) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop > > > .jav > > > a:382) at > > > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > > > at > > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThr > > > eadE > > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) > > > > > > > > > --- > > > Sebastian Kruse > > > Doktorand am Fachbereich Information Systems Group > > > Hasso-Plattner-Institut an der Universität Potsdam > > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 > > > Amtsgericht Potsdam, HRB 12184 > > > Geschäftsführung: Prof. Dr. Christoph Meinel > > > > > > > > > > > >