Hi all,

Thanks a lot for your work for the community!

This question is more like a discuss.
Currently, I'm experiencing an *OutOfMemoryError* when writing files
from Kafka into HDFS using *BucketingSink*.

The log looks like this:

2017-06-10 08:58:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33283, GC
COUNT: 977], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
2017-06-10 08:59:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Memory
usage stats: [HEAP: 14080/20480/20480 MB, NON HEAP: 81/83/-1 MB
(used/committed/max)]
2017-06-10 08:59:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 16846, Total Capacity: 443738663, Used Memory:
443738664
2017-06-10 08:59:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Off-heap pool stats: [Code Cache: 24/25/240 MB (used/committed/max)],
[Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class
Space: 6/6/1024 MB (used/committed/max)]
2017-06-10 08:59:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33311, GC
COUNT: 978], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
2017-06-10 09:00:20,485 INFO
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl
 - backgroundOperationsLoop exiting
2017-06-10 09:00:20,488 INFO  org.apache.zookeeper.ZooKeeper
                     - Session: 0x55591b10666ea92 closed
2017-06-10 09:00:20,488 INFO  org.apache.zookeeper.ClientCnxn
                     - EventThread shut down
2017-06-10 09:00:34,999 INFO
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl
 - backgroundOperationsLoop exiting
2017-06-10 09:00:35,001 INFO  org.apache.zookeeper.ZooKeeper
                     - Session: 0x55591b10666ea94 closed
2017-06-10 09:00:35,001 INFO  org.apache.zookeeper.ClientCnxn
                     - EventThread shut down
2017-06-10 09:00:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Memory
usage stats: [HEAP: 12820/20480/20480 MB, NON HEAP: 81/83/-1 MB
(used/committed/max)]
2017-06-10 09:00:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 17438, Total Capacity: 458405794, Used Memory:
458405795
2017-06-10 09:00:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Off-heap pool stats: [Code Cache: 25/25/240 MB (used/committed/max)],
[Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class
Space: 6/6/1024 MB (used/committed/max)]
2017-06-10 09:00:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33339, GC
COUNT: 979], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
2017-06-10 09:01:04,962 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Source: Custom Source -> Sink: Unnamed (7/22)
(57d3c79ae13fd06de79ca6cb8f1431b4) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:714)
    at org.apache.hadoop.hdfs.DFSOutputStream.start(DFSOutputStream.java:2170)
    at 
org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1685)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
    at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)
    at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:126)
    at 
org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:62)
    at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546)
    at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441)
    at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
    at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:228)
    at 
org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385)
2017-06-10 09:01:04,982 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Freeing task resources for Source: Custom Source -> Sink: Unnamed
(7/22) (57d3c79ae13fd06de79ca6cb8f1431b4).
2017-06-10 09:01:04,989 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Ensuring all FileSystem streams are closed for task Source: Custom
Source -> Sink: Unnamed (7/22) (57d3c79ae13fd06de79ca6cb8f1431b4)
[FAILED]
2017-06-10 09:01:04,989 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Un-registering task and sending final execution state FAILED to
JobManager for task Source: Custom Source -> Sink: Unnamed
(57d3c79ae13fd06de79ca6cb8f1431b4)
2017-06-10 09:01:05,025 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Attempting to cancel task Source: Custom Source -> Sink: Unnamed
(1/22) (f64b613bcb366952d716d57913e01acf).
2017-06-10 09:01:05,025 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Source: Custom Source -> Sink: Unnamed (1/22)
(f64b613bcb366952d716d57913e01acf) switched from RUNNING to CANCELING.
2017-06-10 09:01:05,025 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Triggering cancellation of task code Source: Custom Source -> Sink:
Unnamed (1/22) (f64b613bcb366952d716d57913e01acf).
2017-06-10 09:01:05,033 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Attempting to cancel task Source: Custom Source -> Sink: Unnamed
(4/22) (956689ad000ce02f128dc3147641736c).
2017-06-10 09:01:05,033 INFO  org.apache.flink.runtime.taskmanager.Task


As the memory monitoring suggests, there is still plenty of free memory in
the heap.
So I'm not sure whether this should be an OutOfmemoryError.

I was using fs.hdfs.hadoopconf to setup my HDFS client, is there any
possibility that this error is caused by HDFS client's side?

If so, maybe we should change the error message a little bit?

Reply via email to