Hi all, Thanks a lot for your work for the community!
This question is more like a discuss. Currently, I'm experiencing an *OutOfMemoryError* when writing files from Kafka into HDFS using *BucketingSink*. The log looks like this: 2017-06-10 08:58:42,248 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33283, GC COUNT: 977], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2] 2017-06-10 08:59:42,248 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 14080/20480/20480 MB, NON HEAP: 81/83/-1 MB (used/committed/max)] 2017-06-10 08:59:42,248 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 16846, Total Capacity: 443738663, Used Memory: 443738664 2017-06-10 08:59:42,248 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 24/25/240 MB (used/committed/max)], [Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class Space: 6/6/1024 MB (used/committed/max)] 2017-06-10 08:59:42,248 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33311, GC COUNT: 978], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2] 2017-06-10 09:00:20,485 INFO org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting 2017-06-10 09:00:20,488 INFO org.apache.zookeeper.ZooKeeper - Session: 0x55591b10666ea92 closed 2017-06-10 09:00:20,488 INFO org.apache.zookeeper.ClientCnxn - EventThread shut down 2017-06-10 09:00:34,999 INFO org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl - backgroundOperationsLoop exiting 2017-06-10 09:00:35,001 INFO org.apache.zookeeper.ZooKeeper - Session: 0x55591b10666ea94 closed 2017-06-10 09:00:35,001 INFO org.apache.zookeeper.ClientCnxn - EventThread shut down 2017-06-10 09:00:42,248 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 12820/20480/20480 MB, NON HEAP: 81/83/-1 MB (used/committed/max)] 2017-06-10 09:00:42,248 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 17438, Total Capacity: 458405794, Used Memory: 458405795 2017-06-10 09:00:42,248 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 25/25/240 MB (used/committed/max)], [Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class Space: 6/6/1024 MB (used/committed/max)] 2017-06-10 09:00:42,248 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33339, GC COUNT: 979], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2] 2017-06-10 09:01:04,962 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (7/22) (57d3c79ae13fd06de79ca6cb8f1431b4) switched from RUNNING to FAILED. java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at org.apache.hadoop.hdfs.DFSOutputStream.start(DFSOutputStream.java:2170) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1685) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448) at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787) at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:126) at org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:62) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:228) at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385) 2017-06-10 09:01:04,982 INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source -> Sink: Unnamed (7/22) (57d3c79ae13fd06de79ca6cb8f1431b4). 2017-06-10 09:01:04,989 INFO org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Unnamed (7/22) (57d3c79ae13fd06de79ca6cb8f1431b4) [FAILED] 2017-06-10 09:01:04,989 INFO org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> Sink: Unnamed (57d3c79ae13fd06de79ca6cb8f1431b4) 2017-06-10 09:01:05,025 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Source: Custom Source -> Sink: Unnamed (1/22) (f64b613bcb366952d716d57913e01acf). 2017-06-10 09:01:05,025 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Unnamed (1/22) (f64b613bcb366952d716d57913e01acf) switched from RUNNING to CANCELING. 2017-06-10 09:01:05,025 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Source: Custom Source -> Sink: Unnamed (1/22) (f64b613bcb366952d716d57913e01acf). 2017-06-10 09:01:05,033 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to cancel task Source: Custom Source -> Sink: Unnamed (4/22) (956689ad000ce02f128dc3147641736c). 2017-06-10 09:01:05,033 INFO org.apache.flink.runtime.taskmanager.Task As the memory monitoring suggests, there is still plenty of free memory in the heap. So I'm not sure whether this should be an OutOfmemoryError. I was using fs.hdfs.hadoopconf to setup my HDFS client, is there any possibility that this error is caused by HDFS client's side? If so, maybe we should change the error message a little bit?