OK, thanks for reporting back. Thanks to Igor as well. I just updated the docs with a note about this.
On Thu, May 5, 2016 at 3:16 AM, Chen Qin <qinnc...@gmail.com> wrote: > Uruk & Igor, > > Thanks for helping out! Yup, it fixed my issue. > > Chen > > > > On Wed, May 4, 2016 at 12:57 PM, Igor Berman <igor.ber...@gmail.com> wrote: >> >> I think I've had this issue too and fixed it as Ufuk suggested >> in core-site.xml >> >> something like >> <property> >> <name>fs.s3a.buffer.dir</name> >> <value>/tmp</value> >> </property> >> >> >> On 4 May 2016 at 11:10, Ufuk Celebi <u...@apache.org> wrote: >>> >>> Hey Chen Qin, >>> >>> this seems to be an issue with the S3 file system. The root cause is: >>> >>> Caused by: java.lang.NullPointerException at >>> >>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) >>> at >>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) >>> at >>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) >>> at >>> org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) >>> at >>> org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) >>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) >>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at >>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at >>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at >>> >>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) >>> at >>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) >>> at >>> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489) >>> ... 25 more >>> >>> From [1] it looks like you have to specify >>> >>> fs.s3a.buffer.dir >>> >>> in the Hadoop configuration (where you set the S3 file system). >>> >>> The expected value is a comma separated list of local directories used >>> to buffer results prior to transmitting the to S3 (for large files). >>> >>> Does this fix the issue? Please report back so that we can include in >>> the "common issues" section of the AWS docs. >>> >>> – Ufuk >>> >>> [1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/ >>> >>> >>> On Wed, May 4, 2016 at 2:41 AM, Chen Qin <qinnc...@gmail.com> wrote: >>> > Hi there, >>> > >>> > I run a test job with filestatebackend and save checkpoints on s3 (via >>> > s3a) >>> > >>> > The job crash when checkpoint triggered. Looking into s3 directory and >>> > list >>> > objects. I found the directory is create successfully but all >>> > checkpoints >>> > directory size are empty. >>> > >>> > The host running task manager shows following error. >>> > >>> > Received error response: >>> > com.amazonaws.services.s3.model.AmazonS3Exception: >>> > Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549, >>> > AWS >>> > Error Code: null, AWS Error Message: Not Found, S3 Extended Request >>> > ID:xxxxx >>> > >>> > Has anyone met this issue before? >>> > >>> > flink 1.0.0 >>> > scala 2.10 >>> > hadoop-aws 2.7.2 >>> > aws-java-sdk 1.7.4 >>> > >>> > >>> > Thanks, >>> > Chen >>> > >>> > Attached full log that shows on web dashboard when job canceled. >>> > java.lang.RuntimeException: Error triggering a checkpoint as the result >>> > of >>> > receiving checkpoint barrier at >>> > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) >>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at >>> > java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: >>> > Could >>> > not open output stream for state backend at >>> > >>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498) >>> > at >>> > >>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444) >>> > at java.io.DataOutputStream.write(DataOutputStream.java:88) at >>> > java.io.DataOutputStream.write(DataOutputStream.java:88) at >>> > org.apache.flink.types.StringValue.writeString(StringValue.java:813) at >>> > >>> > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64) >>> > at >>> > >>> > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28) >>> > at >>> > >>> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) >>> > at >>> > >>> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) >>> > at >>> > >>> > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78) >>> > at >>> > >>> > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27) >>> > at >>> > >>> > org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85) >>> > at >>> > >>> > org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265) >>> > at >>> > >>> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175) >>> > at >>> > >>> > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481) >>> > at >>> > >>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678) >>> > ... 8 more Caused by: java.lang.NullPointerException at >>> > >>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268) >>> > at >>> > >>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344) >>> > at >>> > >>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416) >>> > at >>> > >>> > org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198) >>> > at >>> > org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87) >>> > at >>> > org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at >>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at >>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at >>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at >>> > >>> > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404) >>> > at >>> > >>> > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48) >>> > at >>> > >>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489) >>> > ... 25 more >>> > >> >> >