Hi, they are actually using different interfaces and dependencies. Checkpointing uses Flink FileSystem and the shaded Hadoop Filesystem is a special implementation of this based on the Hadoop S3 FileSystem that has all dependencies bundled in. The BucketingSink uses HDFS/Hadoop FileSystem, therefore this needs to have the correct dependency setup.
Flink 1.6. released the new StreamingFileSink which is a replacement for BucketingSink. With Flink 1.7 this will also support the bundled S3 file systems. Best, Aljoscha > On 3. Oct 2018, at 17:55, Julio Biason <julio.bia...@azion.com> wrote: > > Hi Andrey, > > Yes, we followed the guide. Our checkpoints/savepoints are already being > saved on S3/Ceph, using the ShadedHadoop/S3AFileSystem (because it's the one > we managed to completely override the AWS address to point to our Ceph > cluster). > > I suppose I can add the package with the AmazonClientException to my project, > but I still wonder why it works fine for Flink but fails for my project; in > theory, both are using the same dependencies, right? > > On Wed, Oct 3, 2018 at 7:51 AM, Andrey Zagrebin <and...@data-artisans.com > <mailto:and...@data-artisans.com>> wrote: > Hi Julio, > > Looks like some problem with dependencies. > Have you followed the recommended s3 configuration guide [1]? > Is it correct that your job already created checkpoints/savepoints on s3 > before? > > I think if you manually create file system using FileSystem.get(path), it > should be configured the same way as for bucketing sink and checkpoints. > > Best, > Andrey > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/aws.html#s3-simple-storage-service > > <https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/aws.html#s3-simple-storage-service> > >> On 2 Oct 2018, at 15:21, Julio Biason <julio.bia...@azion.com >> <mailto:julio.bia...@azion.com>> wrote: >> >> Hey guys, >> >> I've setup a BucketingSink as a dead letter queue into our Ceph cluster >> using S3, but when I start the job, I get this error: >> >> java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:348) >> at >> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306) >> at >> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271) >> at >> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367) >> at >> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.ClassNotFoundException: >> com.amazonaws.AmazonClientException >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >> ... 17 more >> I find it weird 'cause I've already set up checkpoints (and savepoitns) to >> use S3 as protocol, and I just assume that, if it works for checkpoints, it >> should work here. >> >> (I suppose I could add the aws client as a dependency of my build but, >> again, I assumed that once S3 works for checkpoints, it should work >> everywhere.) >> >> And kinda related, can I assume that using the FileSystem class to create >> FSOutputStreams will follow Flink configuration? I have another type of dead >> letter queue that won't work with BucketingSink and I was thinking about >> using it directly to create files inside that Ceph/S3. >> >> -- >> Julio Biason, Sofware Engineer >> AZION | Deliver. Accelerate. Protect. >> Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51 >> <callto:+5551996209291>99907 0554 > > > > > -- > Julio Biason, Sofware Engineer > AZION | Deliver. Accelerate. Protect. > Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51 > <callto:+5551996209291>99907 0554