Re: Flink job throws ClassNotFoundException on job restart
Hi Stephan, thanks for the quick answer! I try to go to an older revision, Best, Max 2016-10-05 12:10 GMT+02:00 Stephan Ewen : > Hi! > > The master has a temporary regression due to the Work In Progress for the > "changing parallelism of savepoints" feature. > We'll try and complete the change today, then it should work again. > > Sorry for the inconvenience. Can you work with a revision from last week > for today? > > Stephan > > > On Wed, Oct 5, 2016 at 11:50 AM, none none > wrote: > >> I'm running Flink on YARN with two taskmanagers. I wrote a simple job >> that consumes messages from Kafka. The job runs on taskmanager 1. When I >> kill taskmanager 1 (via *kill PID*), the job gets restarted on >> taskmanager 2. So far so good. But right after starting the consumer the >> execution fails: >> >> java.lang.RuntimeException: Could not deserialize NFA. at >> org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:86) >> at >> org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31) >> at >> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >> at java.lang.Thread.run(Thread.java:745)Caused by: >> java.lang.ClassNotFoundException: >> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:348) >> at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626) >> at >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) >> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) >> at >> org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:83) >> ... 8 more >> >> I build the jar file with: >> >> mvn clean package -Pbuild-jar >> >> I also tried this but makes no difference: >> >> mvn clean package >> >> It's strange that my job runs fine on the first attempt, but I get CNFEs >> on job restart. What am I doing wrong? (I'm using Flink 1.2-SNAPSHOT >> because I need the BucketSink). I compared the classpaths of both >> taskmanagers, they are equal. >> > >
Re: Flink job throws ClassNotFoundException on job restart
I went back to commit 2afc092461cf68cf0f3c26a3ab4c58a7bd68cf71 on MASTER, seems to work. 2016-10-05 15:48 GMT+02:00 static-max : > Hi Stephan, > > thanks for the quick answer! I try to go to an older revision, > > Best, > Max > > 2016-10-05 12:10 GMT+02:00 Stephan Ewen : > >> Hi! >> >> The master has a temporary regression due to the Work In Progress for the >> "changing parallelism of savepoints" feature. >> We'll try and complete the change today, then it should work again. >> >> Sorry for the inconvenience. Can you work with a revision from last week >> for today? >> >> Stephan >> >> >> On Wed, Oct 5, 2016 at 11:50 AM, none none >> wrote: >> >>> I'm running Flink on YARN with two taskmanagers. I wrote a simple job >>> that consumes messages from Kafka. The job runs on taskmanager 1. When I >>> kill taskmanager 1 (via *kill PID*), the job gets restarted on >>> taskmanager 2. So far so good. But right after starting the consumer the >>> execution fails: >>> >>> java.lang.RuntimeException: Could not deserialize NFA. at >>> org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:86) >>> at >>> org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31) >>> at >>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >>> at java.lang.Thread.run(Thread.java:745)Caused by: >>> java.lang.ClassNotFoundException: >>> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>> at java.lang.Class.forName0(Native Method) >>> at java.lang.Class.forName(Class.java:348) >>> at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626) >>> at >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) >>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) >>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) >>> at >>> org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:83) >>> ... 8 more >>> >>> I build the jar file with: >>> >>> mvn clean package -Pbuild-jar >>> >>> I also tried this but makes no difference: >>> >>> mvn clean package >>> >>> It's strange that my job runs fine on the first attempt, but I get CNFEs >>> on job restart. What am I doing wrong? (I'm using Flink 1.2-SNAPSHOT >>> because I need the BucketSink). I compared the classpaths of both >>> taskmanagers, they are equal. >>> >> >> >
"Slow ReadProcessor" warnings when using BucketSink
Hi, I have a low throughput job (approx. 1000 messager per Minute), that consumes from Kafka und writes directly to HDFS. After an hour or so, I get the following warnings in the Task Manager log: 2016-10-10 01:59:44,635 WARN org.apache.hadoop.hdfs.DFSClient - Slow ReadProcessor read fields took 30001ms (threshold=3ms); ack: seqno: 66 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 1599276 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[Node1, Node2, Node3]] 2016-10-10 02:04:44,635 WARN org.apache.hadoop.hdfs.DFSClient - Slow ReadProcessor read fields took 30002ms (threshold=3ms); ack: seqno: 13 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 2394027 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[Node1, Node2, Node3]] 2016-10-10 02:05:14,635 WARN org.apache.hadoop.hdfs.DFSClient - Slow ReadProcessor read fields took 30001ms (threshold=3ms); ack: seqno: 17 reply: SUCCESS reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 2547467 flag: 0 flag: 0 flag: 0, targets: [DatanodeInfoWithStorage[Node1, Node2, Node3]] I have not found any erros or warning at the datanodes or the namenode. Every other application using HDFS performs fine. I have very little load and network latency is fine also. I also checked GC, disk I/O. The files written are very small (only a few MB), so writing the blocks should be fast. The threshold is crossed only 1 or 2 ms, this makes me wonder. Does anyone have an Idea where to look next or how to fix these warnings?
PathIsNotEmptyDirectoryException in Namenode HDFS log when using Jobmanager HA in YARN
Hi, I get many (multiple times per minute) errors in my Namenode HDFS logfile: 2016-10-11 17:17:07,596 INFO ipc.Server (Server.java:logException(2401)) - IPC Server handler 295 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from datanode1:34872 Call#2361 Retry#0 org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `/flink/recovery is non empty': Directory is not empty at org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:89) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3829) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.delete(NameNodeRpcServer.java:1071) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.delete(ClientNamenodeProtocolServerSideTranslatorPB.java:619) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307) That is the directory I configured for Jobmanager HA. I deleted it before starting the YARN session but that did not help. The folder gets created by Flink without problems. I'm using latest Flink Master (Commit: 6731ec1) and build it for Hadoop 2.7.3. Any idea is highly appreciated. Thanks a lot!
Re: "Slow ReadProcessor" warnings when using BucketSink
Hi Robert, thanks for your reply. I also didn't find anything helpful on Google. I checked all GC Times, they look OK. Here are GC Times for the Job Manager (the job is running fine since 5 days): Collector Count Time PS-MarkSweep 3 1s PS-Scavenge 5814 2m 12s I have no window or any computation, just reading from Kafka and directly writing to HDFS. I can also run a terasort or teragen in parallel without any problems. Best, Max 2016-10-12 11:32 GMT+02:00 Robert Metzger : > Hi, > I haven't seen this error before. Also, I didn't find anything helpful > searching for the error on Google. > > Did you check the GC times also for Flink? Is your Flink job doing any > heavy tasks (like maintaining large windows, or other operations involving > a lot of heap space?) > > Regards, > Robert > > > On Tue, Oct 11, 2016 at 10:51 AM, static-max > wrote: > >> Hi, >> >> I have a low throughput job (approx. 1000 messager per Minute), that >> consumes from Kafka und writes directly to HDFS. After an hour or so, I get >> the following warnings in the Task Manager log: >> >> 2016-10-10 01:59:44,635 WARN org.apache.hadoop.hdfs.DFSClient >>- Slow ReadProcessor read fields took 30001ms >> (threshold=3ms); ack: seqno: 66 reply: SUCCESS reply: SUCCESS reply: >> SUCCESS downstreamAckTimeNanos: 1599276 flag: 0 flag: 0 flag: 0, targets: >> [DatanodeInfoWithStorage[Node1, Node2, Node3]] >> 2016-10-10 02:04:44,635 WARN org.apache.hadoop.hdfs.DFSClient >>- Slow ReadProcessor read fields took 30002ms >> (threshold=3ms); ack: seqno: 13 reply: SUCCESS reply: SUCCESS reply: >> SUCCESS downstreamAckTimeNanos: 2394027 flag: 0 flag: 0 flag: 0, targets: >> [DatanodeInfoWithStorage[Node1, Node2, Node3]] >> 2016-10-10 02:05:14,635 WARN org.apache.hadoop.hdfs.DFSClient >>- Slow ReadProcessor read fields took 30001ms >> (threshold=3ms); ack: seqno: 17 reply: SUCCESS reply: SUCCESS reply: >> SUCCESS downstreamAckTimeNanos: 2547467 flag: 0 flag: 0 flag: 0, targets: >> [DatanodeInfoWithStorage[Node1, Node2, Node3]] >> >> I have not found any erros or warning at the datanodes or the namenode. >> Every other application using HDFS performs fine. I have very little load >> and network latency is fine also. I also checked GC, disk I/O. >> >> The files written are very small (only a few MB), so writing the blocks >> should be fast. >> >> The threshold is crossed only 1 or 2 ms, this makes me wonder. >> >> Does anyone have an Idea where to look next or how to fix these warnings? >> > >
"Too many open files" in Job Manager
Hi, I get a ton of these messages in my Job Manager's logfile. This makes Flink unstable, as I cannot list or cancel/stop the jobs. I run Flink in YARN under a default Horton HDP 2.5 installation. HDP sets the hard and soft limit of open files to 32768 for the user "yarn" that runs the Flink JVMs, so that should not be an issue. I also checked the number of open files for user "yarn" with "lsof -u yarn | wc -l" and I got ~ 4000 open files when the errors occured in the logs, so there should be room for more. Any idea how to solve this? Thanks, Max 2016-11-12 10:23:04,422 WARN org.jboss.netty.channel.socket.nio.AbstractNioSelector- Failed to accept a connection. java.io.IOException: Too many open files at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) at org.jboss.netty.channel.socket.nio.NioServerBoss.process(NioServerBoss.java:100) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2016-11-12 10:23:04,898 WARN io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. java.io.IOException: Too many open files at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) at io.netty.channel.socket.nio.NioServerSocketChannel.doReadMessages(NioServerSocketChannel.java:135) at io.netty.channel.nio.AbstractNioMessageChannel$NioMessageUnsafe.read(AbstractNioMessageChannel.java:69) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745)
Re: PathIsNotEmptyDirectoryException in Namenode HDFS log when using Jobmanager HA in YARN
Hi Stephan, it's not a problem, but makes finding other errors on my NameNode complicated as I have this error message every minute. Can't we just delete the directory recursively? Regards, Max 2016-10-11 17:59 GMT+02:00 Stephan Ewen : > Hi! > > I think to some extend this is expected. There is some cleanup code that > deletes files and then issues parent directory remove requests. It relies > on the fact that the parent directory is only removed if it is empty (after > the last file was deleted). > > Is this a problem right now, or just a confusing behavior? > > Greetings, > Stephan > > > On Tue, Oct 11, 2016 at 5:25 PM, static-max > wrote: > >> Hi, >> >> I get many (multiple times per minute) errors in my Namenode HDFS logfile: >> >> 2016-10-11 17:17:07,596 INFO ipc.Server (Server.java:logException(2401)) >> - IPC Server handler 295 on 8020, call >> org.apache.hadoop.hdfs.protocol.ClientProtocol.delete >> from datanode1:34872 Call#2361 Retry#0 >> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `/flink/recovery >> is non empty': Directory is not empty >> at org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete( >> FSDirDeleteOp.java:89) >> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(F >> SNamesystem.java:3829) >> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.del >> ete(NameNodeRpcServer.java:1071) >> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServ >> erSideTranslatorPB.delete(ClientNamenodeProtocolServerSideTr >> anslatorPB.java:619) >> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocol >> Protos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNam >> enodeProtocolProtos.java) >> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcIn >> voker.call(ProtobufRpcEngine.java:640) >> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313) >> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro >> upInformation.java:1724) >> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307) >> >> That is the directory I configured for Jobmanager HA. I deleted it before >> starting the YARN session but that did not help. The folder gets created by >> Flink without problems. >> >> I'm using latest Flink Master (Commit: 6731ec1) and build it for Hadoop >> 2.7.3. >> >> Any idea is highly appreciated. Thanks a lot! >> > >
Re: PathIsNotEmptyDirectoryException in Namenode HDFS log when using Jobmanager HA in YARN
Update: I deleted the /flink/recovery folder on HDFS and even then I get the same Exception after the next checkpoint. 2016-11-21 21:51 GMT+01:00 static-max : > Hi Stephan, > > it's not a problem, but makes finding other errors on my NameNode > complicated as I have this error message every minute. > Can't we just delete the directory recursively? > > Regards, > Max > > 2016-10-11 17:59 GMT+02:00 Stephan Ewen : > >> Hi! >> >> I think to some extend this is expected. There is some cleanup code that >> deletes files and then issues parent directory remove requests. It relies >> on the fact that the parent directory is only removed if it is empty (after >> the last file was deleted). >> >> Is this a problem right now, or just a confusing behavior? >> >> Greetings, >> Stephan >> >> >> On Tue, Oct 11, 2016 at 5:25 PM, static-max >> wrote: >> >>> Hi, >>> >>> I get many (multiple times per minute) errors in my Namenode HDFS >>> logfile: >>> >>> 2016-10-11 17:17:07,596 INFO ipc.Server (Server.java:logException(2401)) >>> - IPC Server handler 295 on 8020, call >>> org.apache.hadoop.hdfs.protocol.ClientProtocol.delete >>> from datanode1:34872 Call#2361 Retry#0 >>> org.apache.hadoop.fs.PathIsNotEmptyDirectoryException: `/flink/recovery >>> is non empty': Directory is not empty >>> at org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete( >>> FSDirDeleteOp.java:89) >>> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(F >>> SNamesystem.java:3829) >>> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.del >>> ete(NameNodeRpcServer.java:1071) >>> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServ >>> erSideTranslatorPB.delete(ClientNamenodeProtocolServerSideTr >>> anslatorPB.java:619) >>> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocol >>> Protos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNam >>> enodeProtocolProtos.java) >>> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcIn >>> voker.call(ProtobufRpcEngine.java:640) >>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) >>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313) >>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at javax.security.auth.Subject.doAs(Subject.java:422) >>> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro >>> upInformation.java:1724) >>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307) >>> >>> That is the directory I configured for Jobmanager HA. I deleted it >>> before starting the YARN session but that did not help. The folder gets >>> created by Flink without problems. >>> >>> I'm using latest Flink Master (Commit: 6731ec1) and build it for Hadoop >>> 2.7.3. >>> >>> Any idea is highly appreciated. Thanks a lot! >>> >> >> >