Hi Ted, Thanks for the useful link there.
Actually, I just found out that the error might be triggered by a kerberos issue in our system. I'll use the ticket cache and execute kinit in crontab to see if it is the case. But thanks for the link you provided. I'll definitely check it :) Best regards, Mu On Mon, Jun 12, 2017 at 10:45 AM, Ted Yu <yuzhih...@gmail.com> wrote: > Can you see if the following post helps in troubleshooting ? > > https://blog.fastthread.io/2016/07/06/troubleshoot- > outofmemoryerror-unable-to-create-new-native-thread/ > > Thanks > > On Sun, Jun 11, 2017 at 6:01 PM, Mu Kong <kong.mu....@gmail.com> wrote: > > > Hi all, > > > > Thanks a lot for your work for the community! > > > > This question is more like a discuss. > > Currently, I'm experiencing an *OutOfMemoryError* when writing files > > from Kafka into HDFS using *BucketingSink*. > > > > The log looks like this: > > > > 2017-06-10 08:58:42,248 INFO > > org.apache.flink.runtime.taskmanager.TaskManager - > > Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33283, GC > > COUNT: 977], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2] > > 2017-06-10 08:59:42,248 INFO > > org.apache.flink.runtime.taskmanager.TaskManager - Memory > > usage stats: [HEAP: 14080/20480/20480 MB, NON HEAP: 81/83/-1 MB > > (used/committed/max)] > > 2017-06-10 08:59:42,248 INFO > > org.apache.flink.runtime.taskmanager.TaskManager - Direct > > memory stats: Count: 16846, Total Capacity: 443738663, Used Memory: > > 443738664 > > 2017-06-10 08:59:42,248 INFO > > org.apache.flink.runtime.taskmanager.TaskManager - > > Off-heap pool stats: [Code Cache: 24/25/240 MB (used/committed/max)], > > [Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class > > Space: 6/6/1024 MB (used/committed/max)] > > 2017-06-10 08:59:42,248 INFO > > org.apache.flink.runtime.taskmanager.TaskManager - > > Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33311, GC > > COUNT: 978], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2] > > 2017-06-10 09:00:20,485 INFO > > org.apache.flink.shaded.org.apache.curator.framework.imps. > > CuratorFrameworkImpl > > - backgroundOperationsLoop exiting > > 2017-06-10 09:00:20,488 INFO org.apache.zookeeper.ZooKeeper > > - Session: 0x55591b10666ea92 closed > > 2017-06-10 09:00:20,488 INFO org.apache.zookeeper.ClientCnxn > > - EventThread shut down > > 2017-06-10 09:00:34,999 INFO > > org.apache.flink.shaded.org.apache.curator.framework.imps. > > CuratorFrameworkImpl > > - backgroundOperationsLoop exiting > > 2017-06-10 09:00:35,001 INFO org.apache.zookeeper.ZooKeeper > > - Session: 0x55591b10666ea94 closed > > 2017-06-10 09:00:35,001 INFO org.apache.zookeeper.ClientCnxn > > - EventThread shut down > > 2017-06-10 09:00:42,248 INFO > > org.apache.flink.runtime.taskmanager.TaskManager - Memory > > usage stats: [HEAP: 12820/20480/20480 MB, NON HEAP: 81/83/-1 MB > > (used/committed/max)] > > 2017-06-10 09:00:42,248 INFO > > org.apache.flink.runtime.taskmanager.TaskManager - Direct > > memory stats: Count: 17438, Total Capacity: 458405794, Used Memory: > > 458405795 > > 2017-06-10 09:00:42,248 INFO > > org.apache.flink.runtime.taskmanager.TaskManager - > > Off-heap pool stats: [Code Cache: 25/25/240 MB (used/committed/max)], > > [Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class > > Space: 6/6/1024 MB (used/committed/max)] > > 2017-06-10 09:00:42,248 INFO > > org.apache.flink.runtime.taskmanager.TaskManager - > > Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33339, GC > > COUNT: 979], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2] > > 2017-06-10 09:01:04,962 INFO > > org.apache.flink.runtime.taskmanager.Task - > > Source: Custom Source -> Sink: Unnamed (7/22) > > (57d3c79ae13fd06de79ca6cb8f1431b4) switched from RUNNING to FAILED. > > java.lang.OutOfMemoryError: unable to create new native thread > > at java.lang.Thread.start0(Native Method) > > at java.lang.Thread.start(Thread.java:714) > > at org.apache.hadoop.hdfs.DFSOutputStream.start( > > DFSOutputStream.java:2170) > > at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate( > > DFSOutputStream.java:1685) > > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689) > > at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624) > > at org.apache.hadoop.hdfs.DistributedFileSystem$7. > > doCall(DistributedFileSystem.java:448) > > at org.apache.hadoop.hdfs.DistributedFileSystem$7. > > doCall(DistributedFileSystem.java:444) > > at org.apache.hadoop.fs.FileSystemLinkResolver.resolve( > > FileSystemLinkResolver.java:81) > > at org.apache.hadoop.hdfs.DistributedFileSystem.create( > > DistributedFileSystem.java:459) > > at org.apache.hadoop.hdfs.DistributedFileSystem.create( > > DistributedFileSystem.java:387) > > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909) > > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890) > > at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787) > > at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open( > > StreamWriterBase.java:126) > > at org.apache.flink.streaming.connectors.fs.StringWriter. > > open(StringWriter.java:62) > > at org.apache.flink.streaming.connectors.fs.bucketing. > > BucketingSink.openNewPartFile(BucketingSink.java:546) > > at org.apache.flink.streaming.connectors.fs.bucketing. > > BucketingSink.invoke(BucketingSink.java:441) > > at org.apache.flink.streaming.api.operators.StreamSink. > > processElement(StreamSink.java:41) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > > CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > > CopyingChainingOutput.collect(OperatorChain.java:503) > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$ > > CopyingChainingOutput.collect(OperatorChain.java:483) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > > CountingOutput.collect(AbstractStreamOperator.java:891) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator$ > > CountingOutput.collect(AbstractStreamOperator.java:869) > > at org.apache.flink.streaming.api.operators.StreamSourceContexts$ > > NonTimestampContext.collect(StreamSourceContexts.java:103) > > at org.apache.flink.streaming.connectors.kafka.internals. > > AbstractFetcher.emitRecord(AbstractFetcher.java:228) > > at org.apache.flink.streaming.connectors.kafka.internals. > > SimpleConsumerThread.run(SimpleConsumerThread.java:385) > > 2017-06-10 09:01:04,982 INFO > > org.apache.flink.runtime.taskmanager.Task - > > Freeing task resources for Source: Custom Source -> Sink: Unnamed > > (7/22) (57d3c79ae13fd06de79ca6cb8f1431b4). > > 2017-06-10 09:01:04,989 INFO > > org.apache.flink.runtime.taskmanager.Task - > > Ensuring all FileSystem streams are closed for task Source: Custom > > Source -> Sink: Unnamed (7/22) (57d3c79ae13fd06de79ca6cb8f1431b4) > > [FAILED] > > 2017-06-10 09:01:04,989 INFO > > org.apache.flink.runtime.taskmanager.TaskManager - > > Un-registering task and sending final execution state FAILED to > > JobManager for task Source: Custom Source -> Sink: Unnamed > > (57d3c79ae13fd06de79ca6cb8f1431b4) > > 2017-06-10 09:01:05,025 INFO > > org.apache.flink.runtime.taskmanager.Task - > > Attempting to cancel task Source: Custom Source -> Sink: Unnamed > > (1/22) (f64b613bcb366952d716d57913e01acf). > > 2017-06-10 09:01:05,025 INFO > > org.apache.flink.runtime.taskmanager.Task - > > Source: Custom Source -> Sink: Unnamed (1/22) > > (f64b613bcb366952d716d57913e01acf) switched from RUNNING to CANCELING. > > 2017-06-10 09:01:05,025 INFO > > org.apache.flink.runtime.taskmanager.Task - > > Triggering cancellation of task code Source: Custom Source -> Sink: > > Unnamed (1/22) (f64b613bcb366952d716d57913e01acf). > > 2017-06-10 09:01:05,033 INFO > > org.apache.flink.runtime.taskmanager.Task - > > Attempting to cancel task Source: Custom Source -> Sink: Unnamed > > (4/22) (956689ad000ce02f128dc3147641736c). > > 2017-06-10 09:01:05,033 INFO org.apache.flink.runtime.taskmanager.Task > > > > > > As the memory monitoring suggests, there is still plenty of free memory > in > > the heap. > > So I'm not sure whether this should be an OutOfmemoryError. > > > > I was using fs.hdfs.hadoopconf to setup my HDFS client, is there any > > possibility that this error is caused by HDFS client's side? > > > > If so, maybe we should change the error message a little bit? > > >