Hi everyone, I just wanted to let you know, that after quite a few more runs on different machines the buffer reordering problem did not happen to appear again. I don't know what caused the problem, maybe it really was due to the potentially illegal UTF-16 code within strings. If the error should ever happen again, I will let you know but otherwise it might just have been some strange edge-case side effect. :)
Cheers, Sebastian -----Original Message----- From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan Ewen Sent: Donnerstag, 4. Juni 2015 13:13 To: dev@flink.apache.org Subject: Re: Buffer re-ordering problem Thanks for helping us debug this. You can start many taskmanagers in one JVM, by using the LocalMiniCluster. Have a look at this (manually triggered) test, which runs 100 TaskManagers in one JVM: https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java The data type serialization should have no impact on the network stream, as that is buffer-oriented and has no understanding or notion of the types inside. That transition comes later in the readers/writers. It is never impossible for a bug to be in there such that the serialization affects the buffer transport. Thanks for the pointer, we will definitely look into that. On Thu, Jun 4, 2015 at 1:07 PM, Kruse, Sebastian <sebastian.kr...@hpi.de> wrote: > Thanks for your feedback. I am neither running IPSec nor the > aesni-intel module. > > So far, I could not reproduce the reordering issue. I also have > detected that my code might have created String objects with invalid > UTF16 content in exactly those jobs that suffered from the reordering. > I wanted to use binary data as a key, so I put it into a char array > and wrapped it in a String, hoping that would do the trick. Maybe, > this has caused some error in the String (de-)serialization with > further consequences, e.g, skipping of a buffer. Ever since I have > encoded the binary data in Base64, the reordering error did not pop up > again. I will do some more runs to verify this. > > However, the other problem with the serialization exceptions should > not be affected of this. To see if network stream corruption is an > issue, I also perform some runs now on a larger single machine with a > single task manager, so that no network communication is involved. > Serialization should take place anyway, right? Is there a way, to run > a second task manager on the same machine within the same OS? > > Cheers, > Sebastian > > -----Original Message----- > From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf > Of Stephan Ewen > Sent: Mittwoch, 3. Juni 2015 20:14 > To: dev@flink.apache.org > Subject: Re: Buffer re-ordering problem > > Hi Sebastian! > > The first error that you report looks could also be a lost buffer, > rather than a reordering. > The second error that you post seems a serialization issue. > Both can be consequences of a network stream corruption. > > Would be good to figure out why the netty event loop does not > encode/decode this properly in your case. > > BTW: I would assume we can virtually rule out TCP/kernel bugs as > sources for the corruption, unless you have this setup here: > > http://arstechnica.com/information-technology/2015/05/the-discovery-of > -apache-zookeepers-poison-packet/ > > Stephan > > > On Wed, Jun 3, 2015 at 7:51 PM, Ufuk Celebi <u...@apache.org> wrote: > > > Hey Sebastian, > > > > would you mind sharing the code and dataset with me (privately would > > work fine). I want to try to reproduce this as well. > > > > – Ufuk > > > > On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian > > <sebastian.kr...@hpi.de> > > wrote: > > > > > I am currently using 0.9-SNAPSHOT. All the non-jar files are from > > > an > > older > > > build, but I recently manually updated the flink-dist.jar with > > > commit d163a817fa2e330e86384d0bbcd104f051a6fb48. > > > > > > Our setup consists of 10 workers and a master, all interconnected > > > via a switch (100 Mbit, I think). The data set is an NTriple file > > > of about 8 > > GB, > > > however, intermediate datasets might be much larger. However, for > > > smaller datasets, I could not observe this problem, yet. > > > > > > Also, during the failure there are a lot of concurrent shuffles > > > ongoing [1,2]. Additionally, it might be interesting that in the > > > affected jobs either this exception occurs or another one that > > > looks like a network disruption. [3] So, it might well be, that > > > our setup suffers from occasional network errors, especially > > > during high network load - but > > that’s > > > just a plain guess. > > > > > > Regarding the reproducibility, I can only say right now, that this > > > error occurred twice. I will re-run the jobs and see, if the > > > errors can be observed again and let you know. > > > > > > Cheers, > > > Sebastian > > > > > > [1] > > > > > https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed0533 > > 31 > > f5099cad64704 > > > [2] > > > > > https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68 > > bf > > 598309d57b7b1 > > > [3] Stack trace: tenem18.hpi.uni-potsdam.de > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > GroupReduce (GroupReduce at > > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlink > > Pl > > an(AllAtOnceTraversalStrategy.scala:56)) > > > -> Filter (Filter at > > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlink > > Pl > > an(AllAtOnceTraversalStrategy.scala:68)) > > > -> FlatMap (FlatMap at > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Trav > > er > > salStrategy.scala:46)) > > > -> Map (Map at > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > , caused an error: Error obtaining the sorted input: Thread > > > 'SortMerger Reading Thread' terminated due to an exception: null > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTa > > sk > > .java:471) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPac > > tT > > ask.java:362) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > at java.lang.Thread.run(Thread.java:745) > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > input: > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > null at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIter > > at > > or(UnilateralSortMerger.java:607) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularP > > ac > > tTask.java:1145) > > > at > > > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRe > > du > > ceDriver.java:94) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTa > > sk > > .java:466) > > > ... 3 more > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > terminated due to an exception: null at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadB > > as > > e.run(UnilateralSortMerger.java:784) > > > Caused by: java.io.EOFException > > > at > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte > > (D > > ataInputDeserializer.java:290) > > > at > > > org.apache.flink.types.StringValue.readString(StringValue.java:741 > > > ) > > > at > > > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserial > > iz > > e(StringSerializer.java:68) > > > at > > > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserial > > iz > > e(StringSerializer.java:28) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize > > (C > > aseClassSerializer.scala:102) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize > > (C > > aseClassSerializer.scala:29) > > > at > > > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.de > > se > > rialize(GenericArraySerializer.java:123) > > > at > > > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.de > > se > > rialize(GenericArraySerializer.java:33) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize > > (C > > aseClassSerializer.scala:102) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize > > (C > > aseClassSerializer.scala:95) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize > > (C > > aseClassSerializer.scala:29) > > > at > > > > > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.rea > > d( > > ReusingDeserializationDelegate.java:57) > > > at > > > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti > > ve > > SpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRec > > or > > dDeserializer.java:133) > > > at > > > > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader. > > ge > > tNextRecord(AbstractRecordReader.java:64) > > > at > > > > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.n > > ex > > t(MutableRecordReader.java:34) > > > at > > > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIt > > er > > ator.java:59) > > > at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Reading > > Th > > read.go(UnilateralSortMerger.java:1020) > > > at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadB > > as > > e.run(UnilateralSortMerger.java:781) > > > > > > > > > -----Original Message----- > > > From: Ufuk Celebi [mailto:u...@apache.org] > > > Sent: Mittwoch, 3. Juni 2015 10:33 > > > To: dev@flink.apache.org > > > Subject: Re: Buffer re-ordering problem > > > > > > This is a critical bug. > > > > > > - which version are you using? If snapshot, which commit? > > > - what is your setup? Number of machines, datset etc? > > > - is it reproducible? > > > > > > On Wednesday, June 3, 2015, Kruse, Sebastian > > > <sebastian.kr...@hpi.de> > > > wrote: > > > > > > > Hi everyone, > > > > > > > > I had some jobs running over the night and in two of them after > > > > about half an hour the following exception occurred. Do you know > > > > why this > > > happens? > > > > > > > > Thanks, > > > > Sebastian > > > > > > > > tenem16.hpi.uni-potsdam.de > > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > > GroupReduce (GroupReduce at > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceF > > > > li > > > > nkPl > > > > an(AllAtOnceTraversalStrategy.scala:56)) > > > > -> Filter (Filter at > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceF > > > > li > > > > nkPl > > > > an(AllAtOnceTraversalStrategy.scala:68)) > > > > -> FlatMap (FlatMap at > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply( > > > > Tr > > > > aver > > > > salStrategy.scala:46)) > > > > -> Map (Map at > > > > > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > > , caused an error: Error obtaining the sorted input: Thread > > > > 'SortMerger Reading Thread' terminated due to an exception: > > > > Buffer > > > re-ordering: > > > > expected buffer with sequence number 17841, but received 17842. > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPa > > > > ct > > > > Task > > > > .java:471) > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(Regula > > > > rP > > > > actT > > > > ask.java:362) at > > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > > at java.lang.Thread.run(Thread.java:745) > > > > Caused by: java.lang.RuntimeException: Error obtaining the > > > > sorted > > input: > > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > > Buffer > > > > re-ordering: expected buffer with sequence number 17841, but > > > > received > > > 17842. > > > > at > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.get > > > > It > > > > erat > > > > or(UnilateralSortMerger.java:607) at > > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(Regu > > > > la > > > > rPac > > > > tTask.java:1145) > > > > at > > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(Gro > > > > up > > > > Redu > > > > ceDriver.java:94) > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPa > > > > ct > > > > Task > > > > .java:466) > > > > ... 3 more > > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > > terminated due to an exception: Buffer re-ordering: expected > > > > buffer with sequence number 17841, but received 17842. > > > > at > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Thr > > > > ea > > > > dBas > > > > e.run(UnilateralSortMerger.java:784) > > > > Caused by: > > > > > > > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > > > > Buffer re-ordering: expected buffer with sequence number 17841, > > > > but received 17842. > > > > at > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInp > > > > ut > > > > Chan > > > > nel.onBuffer(RemoteInputChannel.java:253) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClient > > > > Ha > > > > ndle > > > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClient > > > > Ha > > > > ndle > > > > r.decodeMsg(PartitionRequestClientHandler.java:214) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClient > > > > Ha > > > > ndle > > > > r.channelRead(PartitionRequestClientHandler.java:158) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead > > > > (A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(A > > > > bs > > > > trac > > > > tChannelHandlerContext.java:324) at > > > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(Messa > > > > ge > > > > ToMe > > > > ssageDecoder.java:103) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead > > > > (A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(A > > > > bs > > > > trac > > > > tChannelHandlerContext.java:324) at > > > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMe > > > > ss > > > > ageD > > > > ecoder.java:242) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead > > > > (A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(A > > > > bs > > > > trac > > > > tChannelHandlerContext.java:324) at > > > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultC > > > > ha > > > > nnel > > > > Pipeline.java:847) > > > > at > > > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(A > > > > bs > > > > trac > > > > tNioByteChannel.java:131) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop. > > > > java > > > > :511) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(N > > > > io > > > > Even > > > > tLoop.java:468) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLo > > > > op > > > > .jav > > > > a:382) at > > > > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > > > > at > > > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleT > > > > hr > > > > eadE > > > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) > > > > > > > > > > > > --- > > > > Sebastian Kruse > > > > Doktorand am Fachbereich Information Systems Group > > > > Hasso-Plattner-Institut an der Universität Potsdam > > > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 > > > > Amtsgericht Potsdam, HRB 12184 > > > > Geschäftsführung: Prof. Dr. Christoph Meinel > > > > > > > > > > > > > > > > > >