OK, thanks for reporting back. Thanks to Igor as well.

I just updated the docs with a note about this.

On Thu, May 5, 2016 at 3:16 AM, Chen Qin <qinnc...@gmail.com> wrote:
> Uruk & Igor,
>
> Thanks for helping out!  Yup, it fixed my issue.
>
> Chen
>
>
>
> On Wed, May 4, 2016 at 12:57 PM, Igor Berman <igor.ber...@gmail.com> wrote:
>>
>> I think I've had this issue too and fixed it as Ufuk suggested
>> in core-site.xml
>>
>> something like
>> <property>
>> <name>fs.s3a.buffer.dir</name>
>> <value>/tmp</value>
>> </property>
>>
>>
>> On 4 May 2016 at 11:10, Ufuk Celebi <u...@apache.org> wrote:
>>>
>>> Hey Chen Qin,
>>>
>>> this seems to be an issue with the S3 file system. The root cause is:
>>>
>>>  Caused by: java.lang.NullPointerException at
>>>
>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
>>> at
>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
>>> at
>>> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
>>> at
>>> org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
>>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
>>> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
>>>
>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
>>> at
>>> org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
>>> ... 25 more
>>>
>>> From [1] it looks like you have to specify
>>>
>>> fs.s3a.buffer.dir
>>>
>>> in the Hadoop configuration (where you set the S3 file system).
>>>
>>> The expected value is a comma separated list of local directories used
>>> to buffer results prior to transmitting the to S3 (for large files).
>>>
>>> Does this fix the issue? Please report back so that we can include in
>>> the "common issues" section of the AWS docs.
>>>
>>> – Ufuk
>>>
>>> [1] http://deploymentzone.com/2015/12/20/s3a-on-spark-on-aws-ec2/
>>>
>>>
>>> On Wed, May 4, 2016 at 2:41 AM, Chen Qin <qinnc...@gmail.com> wrote:
>>> > Hi there,
>>> >
>>> > I run a test job with filestatebackend and save checkpoints on s3 (via
>>> > s3a)
>>> >
>>> > The job crash when checkpoint triggered. Looking into s3 directory and
>>> > list
>>> > objects. I found the directory is create successfully but all
>>> > checkpoints
>>> > directory size are empty.
>>> >
>>> > The host running task manager shows following error.
>>> >
>>> > Received error response:
>>> > com.amazonaws.services.s3.model.AmazonS3Exception:
>>> > Status Code: 404, AWS Service: null, AWS Request ID: CF1845CA84E07549,
>>> > AWS
>>> > Error Code: null, AWS Error Message: Not Found, S3 Extended Request
>>> > ID:xxxxx
>>> >
>>> > Has anyone met this issue before?
>>> >
>>> > flink 1.0.0
>>> > scala 2.10
>>> > hadoop-aws 2.7.2
>>> > aws-java-sdk 1.7.4
>>> >
>>> >
>>> > Thanks,
>>> > Chen
>>> >
>>> > Attached full log that shows on web dashboard when job canceled.
>>> > java.lang.RuntimeException: Error triggering a checkpoint as the result
>>> > of
>>> > receiving checkpoint barrier at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:681)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:674)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at
>>> > java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException:
>>> > Could
>>> > not open output stream for state backend at
>>> >
>>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:498)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.write(FsStateBackend.java:444)
>>> > at java.io.DataOutputStream.write(DataOutputStream.java:88) at
>>> > java.io.DataOutputStream.write(DataOutputStream.java:88) at
>>> > org.apache.flink.types.StringValue.writeString(StringValue.java:813) at
>>> >
>>> > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
>>> > at
>>> >
>>> > org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>> > at
>>> >
>>> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>>> > at
>>> >
>>> > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:78)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.ArrayListSerializer.serialize(ArrayListSerializer.java:27)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
>>> > at
>>> >
>>> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:175)
>>> > at
>>> >
>>> > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:509)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:481)
>>> > at
>>> >
>>> > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:678)
>>> > ... 8 more Caused by: java.lang.NullPointerException at
>>> >
>>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:268)
>>> > at
>>> >
>>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
>>> > at
>>> >
>>> > org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
>>> > at
>>> >
>>> > org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
>>> > at
>>> > org.apache.hadoop.fs.s3a.S3AOutputStream.<init>(S3AOutputStream.java:87)
>>> > at
>>> > org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410) at
>>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:907) at
>>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:888) at
>>> > org.apache.hadoop.fs.FileSystem.create(FileSystem.java:785) at
>>> >
>>> > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:404)
>>> > at
>>> >
>>> > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:48)
>>> > at
>>> >
>>> > org.apache.flink.runtime.state.filesystem.FsStateBackend$FsCheckpointStateOutputStream.flush(FsStateBackend.java:489)
>>> > ... 25 more
>>> >
>>
>>
>

Reply via email to