Hi, Robert BTW, I did some field study and I think it's possible to support streaming sink using presto s3 filesystem. I think that would help user to use presto s3 fs in all access to s3. I created this jira ticket https://issues.apache.org/jira/browse/FLINK-17364 . what do you think?
Best Lu On Tue, Apr 21, 2020 at 1:46 PM Lu Niu <qqib...@gmail.com> wrote: > Cool, thanks! > > On Tue, Apr 21, 2020 at 4:51 AM Robert Metzger <rmetz...@apache.org> > wrote: > >> I'm not aware of anything. I think the presto s3 file system is generally >> the recommended S3 FS implementation. >> >> On Mon, Apr 13, 2020 at 11:46 PM Lu Niu <qqib...@gmail.com> wrote: >> >>> Thank you both. Given the debug overhead, I might just try out presto s3 >>> file system then. Besides that presto s3 file system doesn't support >>> streaming sink, is there anything else I need to keep in mind? Thanks! >>> >>> Best >>> Lu >>> >>> On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger <rmetz...@apache.org> >>> wrote: >>> >>>> Hey, >>>> Others have experienced this as well, yes: >>>> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException >>>> I have also notified the Hadoop project about this issue: >>>> https://issues.apache.org/jira/browse/HADOOP-15915 >>>> >>>> I agree with Congxian: You could try reaching out to the Hadoop user@ >>>> list for additional help. Maybe logging on DEBUG level helps already? >>>> If you are up for an adventure, you could also consider adding some >>>> debugging code into Hadoop's DiskChecker and compile a custom Hadoop >>>> version. >>>> >>>> Best, >>>> Robert >>>> >>>> >>>> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu <qcx978132...@gmail.com> >>>> wrote: >>>> >>>>> Hi LU >>>>> >>>>> I'm not familiar with S3 file system, maybe others in Flink community >>>>> can help you in this case, or maybe you can also reach out to s3 >>>>> teams/community for help. >>>>> >>>>> Best, >>>>> Congxian >>>>> >>>>> >>>>> Lu Niu <qqib...@gmail.com> 于2020年4月8日周三 上午11:05写道: >>>>> >>>>>> Hi, Congxiao >>>>>> >>>>>> Thanks for replying. yeah, I also found those references. However, as >>>>>> I mentioned in original post, there is enough capacity in all disk. Also, >>>>>> when I switch to presto file system, the problem goes away. Wondering >>>>>> whether others encounter similar issue. >>>>>> >>>>>> Best >>>>>> Lu >>>>>> >>>>>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu <qcx978132...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi >>>>>>> From the stack, seems the problem is that " >>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop. >>>>>>> util.DiskChecker$DiskErrorException: Could not find any valid local >>>>>>> directory for s3ablock-0001-", and I googled the exception, found there >>>>>>> is >>>>>>> some relative page[1], could you please make sure there is enough space >>>>>>> on >>>>>>> the local dis. >>>>>>> >>>>>>> [1] >>>>>>> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt-xxxx-xxxx-m-x-file-out >>>>>>> Best, >>>>>>> Congxian >>>>>>> >>>>>>> >>>>>>> Lu Niu <qqib...@gmail.com> 于2020年4月8日周三 上午8:41写道: >>>>>>> >>>>>>>> Hi, flink users >>>>>>>> >>>>>>>> Did anyone encounter such error? The error comes from >>>>>>>> S3AFileSystem. But there is no capacity issue on any disk. we are using >>>>>>>> hadoop 2.7.1. >>>>>>>> ``` >>>>>>>> >>>>>>>> Caused by: java.util.concurrent.ExecutionException: >>>>>>>> java.io.IOException: Could not open output stream for state backend >>>>>>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >>>>>>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011) >>>>>>>> ... 3 more >>>>>>>> Caused by: java.io.IOException: Could not open output stream for state >>>>>>>> backend >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209) >>>>>>>> at >>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131) >>>>>>>> at >>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99) >>>>>>>> at >>>>>>>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) >>>>>>>> at >>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) >>>>>>>> at >>>>>>>> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604) >>>>>>>> at >>>>>>>> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830) >>>>>>>> at >>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100) >>>>>>>> at >>>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70) >>>>>>>> at >>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424) >>>>>>>> at >>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320) >>>>>>>> at >>>>>>>> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) >>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447) >>>>>>>> ... 5 more >>>>>>>> Caused by: >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: >>>>>>>> Could not find any valid local directory for s3ablock-0001- >>>>>>>> at >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:442) >>>>>>>> at >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:456) >>>>>>>> at >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:200) >>>>>>>> at >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.createTmpFileForWrite(S3AFileSystem.java:572) >>>>>>>> at >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ADataBlocks$DiskBlockFactory.create(S3ADataBlocks.java:811) >>>>>>>> at >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.createBlockIfNeeded(S3ABlockOutputStream.java:190) >>>>>>>> at >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3ABlockOutputStream.<init>(S3ABlockOutputStream.java:168) >>>>>>>> at >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:778) >>>>>>>> at >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1169) >>>>>>>> at >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1149) >>>>>>>> at >>>>>>>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1038) >>>>>>>> at >>>>>>>> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:141) >>>>>>>> at >>>>>>>> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.create(HadoopFileSystem.java:37) >>>>>>>> at >>>>>>>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126) >>>>>>>> at >>>>>>>> org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61) >>>>>>>> at >>>>>>>> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356) >>>>>>>> ... 22 more >>>>>>>> >>>>>>>> ``` >>>>>>>> >>>>>>>> Best >>>>>>>> Lu >>>>>>>> >>>>>>>