@Stephan my flink cluster setup- 5 nodes, each running 1 TaskManager. Slots per task manager: 2-4 (I tried varying this to see if this has any impact). Network buffers: 5k - 20k (tried different values for it).
@Ufuk: Thank you for creating a branch with checksum. I will use this build to test the jobs and post here what I will learn. Best, Tarandeep On Wed, Oct 5, 2016 at 4:53 AM, Ufuk Celebi <u...@apache.org> wrote: > @Tarandeep and Flavio: +1 to Stephan's question. > > Furthermore, I've created a branch which adds a simple CRC32 checksum > calculation over the network buffer content here: > https://github.com/uce/flink/tree/checksum > > It would be great if you could run your job with a build from this > branch. It's based on the current 1.1. release branch. If you need > help building and running from this branch, feel free to ping me. > > git clone https://github.com/uce/flink.git flink-uce > cd flink-uce > git checkout -b checksum origin/checksum > mvn clean install -DskipTests > > The build binary distro is found in > flink-dist/target/flink-1.1-SNAPSHOT-bin/flink-1.1-SNAPSHOT. Just copy > your config files to the conf dir there and start Flink as usual in > that directory. > > If the checksums don't match the job should fail with an Exception. If > this happens, it is likely that the problems are caused by data > corruption on the network layer. If not, it's more likely that there > is something off with the Kryo serializers. > > (You can also follow this guide depending on your Hadoop requirements: > https://ci.apache.org/projects/flink/flink-docs- > master/setup/building.html#hadoop-versions) > > > On Tue, Oct 4, 2016 at 7:10 PM, Stephan Ewen <se...@apache.org> wrote: > > It would be great to know if this only occurs in setups where Netty in > > involved (more than one TaskManager and, and at least one > shuffle/rebalance) > > or also in one-taskmanager setups (which have local channels only). > > > > Stephan > > > > On Tue, Oct 4, 2016 at 11:49 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> > >> Hi Tarandeep, > >> > >> it would be great if you could compile a small example data set with > which > >> you're able to reproduce your problem. We could then try to debug it. It > >> would also be interesting to know whether Flavio's bug solves your > problem > >> or not. > >> > >> Cheers, > >> Till > >> > >> On Mon, Oct 3, 2016 at 10:26 PM, Flavio Pompermaier < > pomperma...@okkam.it> > >> wrote: > >>> > >>> I think you're running into the same exception I face sometimes..I've > >>> opened a jira for it [1]. Could you please try to apply that patch and > see > >>> if things get better? > >>> > >>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/FLINK-4719 > >>> > >>> Best, > >>> Flavio > >>> > >>> > >>> On 3 Oct 2016 22:09, "Tarandeep Singh" <tarand...@gmail.com> wrote: > >>>> > >>>> Now, when I ran it again (with lower task slots per machine) I got a > >>>> different error- > >>>> > >>>> org.apache.flink.client.program.ProgramInvocationException: The > program > >>>> execution failed: Job execution failed. > >>>> at > >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:381) > >>>> at > >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:355) > >>>> at > >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:315) > >>>> at > >>>> org.apache.flink.client.program.ContextEnvironment. > execute(ContextEnvironment.java:60) > >>>> at > >>>> org.apache.flink.api.java.ExecutionEnvironment.execute( > ExecutionEnvironment.java:855) > >>>> .... > >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >>>> at > >>>> sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > >>>> at > >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > >>>> at java.lang.reflect.Method.invoke(Method.java:498) > >>>> at > >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod( > PackagedProgram.java:505) > >>>> at > >>>> org.apache.flink.client.program.PackagedProgram. > invokeInteractiveModeForExecution(PackagedProgram.java:403) > >>>> at > >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) > >>>> at > >>>> org.apache.flink.client.CliFrontend.executeProgramBlocking( > CliFrontend.java:866) > >>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > >>>> at > >>>> org.apache.flink.client.CliFrontend.parseParameters( > CliFrontend.java:1189) > >>>> at org.apache.flink.client.CliFrontend.main(CliFrontend. > java:1239) > >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > >>>> execution failed. > >>>> at > >>>> org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$ > mcV$sp(JobManager.scala:714) > >>>> at > >>>> org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) > >>>> at > >>>> org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) > >>>> at > >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable. > liftedTree1$1(Future.scala:24) > >>>> at > >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run( > Future.scala:24) > >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > >>>> at > >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec( > AbstractDispatcher.scala:401) > >>>> at > >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >>>> at > >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > pollAndExecAll(ForkJoinPool.java:1253) > >>>> at > >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > runTask(ForkJoinPool.java:1346) > >>>> at > >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker( > ForkJoinPool.java:1979) > >>>> at > >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > ForkJoinWorkerThread.java:107) > >>>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find > >>>> class: javaec40-d994-yteBuffer > >>>> at > >>>> com.esotericsoftware.kryo.util.DefaultClassResolver. > readName(DefaultClassResolver.java:138) > >>>> at > >>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > DefaultClassResolver.java:115) > >>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > >>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo. > java:752) > >>>> at > >>>> org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:228) > >>>> at > >>>> org.apache.flink.api.java.typeutils.runtime. > PojoSerializer.deserialize(PojoSerializer.java:431) > >>>> at > >>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDeleg > ate.read(NonReusingDeserializationDelegate.java:55) > >>>> at > >>>> org.apache.flink.runtime.io.network.api.serialization. > SpillingAdaptiveSpanningRecordDeserializer.getNextRecord( > SpillingAdaptiveSpanningRecordDeserializer.java:124) > >>>> at > >>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader. > getNextRecord(AbstractRecordReader.java:65) > >>>> at > >>>> org.apache.flink.runtime.io.network.api.reader. > MutableRecordReader.next(MutableRecordReader.java:34) > >>>> at > >>>> org.apache.flink.runtime.operators.util.ReaderIterator. > next(ReaderIterator.java:73) > >>>> at > >>>> org.apache.flink.runtime.operators.FlatMapDriver.run( > FlatMapDriver.java:101) > >>>> at > >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) > >>>> at > >>>> org.apache.flink.runtime.operators.BatchTask.invoke( > BatchTask.java:345) > >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >>>> at java.lang.Thread.run(Thread.java:745) > >>>> Caused by: java.lang.ClassNotFoundException: javaec40-d994-yteBuffer > >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > >>>> at java.lang.Class.forName0(Native Method) > >>>> at java.lang.Class.forName(Class.java:348) > >>>> at > >>>> com.esotericsoftware.kryo.util.DefaultClassResolver. > readName(DefaultClassResolver.java:136) > >>>> ... 15 more > >>>> > >>>> > >>>> -Tarandeep > >>>> > >>>> On Mon, Oct 3, 2016 at 12:49 PM, Tarandeep Singh <tarand...@gmail.com > > > >>>> wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> I am using flink-1.0.0 and running ETL (batch) jobs on it for quite > >>>>> some time (few months) without any problem. Starting this morning, I > have > >>>>> been getting errors like these- > >>>>> > >>>>> "Received an event in channel 3 while still having data from a > record. > >>>>> This indicates broken serialization logic. If you are using custom > >>>>> serialization code (Writable or Value types), check their > serialization > >>>>> routines. In the case of Kryo, check the respective Kryo serializer." > >>>>> > >>>>> My datasets are in Avro format. The only thing that changed today is > - > >>>>> I moved to smaller cluster. When I first ran the ETL jobs, they > failed with > >>>>> this error- > >>>>> > >>>>> "Insufficient number of network buffers: required 20, but only 10 > >>>>> available. The total number of network buffers is currently set to > 20000. > >>>>> You can increase this number by setting the configuration key > >>>>> 'taskmanager.network.numberOfBuffers'" > >>>>> > >>>>> I increased number of buffers to 30k. Also decreased number of slots > >>>>> per machine as I noticed load per machine was too high. After that, > when I > >>>>> restart the jobs, I am getting the above error. > >>>>> > >>>>> Can someone please help me debug it? > >>>>> > >>>>> Thank you, > >>>>> Tarandeep > >>>> > >>>> > >> > > >