Hi,

they are actually using different interfaces and dependencies. Checkpointing 
uses Flink FileSystem and the shaded Hadoop Filesystem is a special 
implementation of this based on the Hadoop S3 FileSystem that has all 
dependencies bundled in. The BucketingSink uses HDFS/Hadoop FileSystem, 
therefore this needs to have the correct dependency setup.

Flink 1.6. released the new StreamingFileSink which is a replacement for 
BucketingSink. With Flink 1.7 this will also support the bundled S3 file 
systems.

Best,
Aljoscha

> On 3. Oct 2018, at 17:55, Julio Biason <julio.bia...@azion.com> wrote:
> 
> Hi Andrey,
> 
> Yes, we followed the guide. Our checkpoints/savepoints are already being 
> saved on S3/Ceph, using the ShadedHadoop/S3AFileSystem (because it's the one 
> we managed to completely override the AWS address to point to our Ceph 
> cluster).
> 
> I suppose I can add the package with the AmazonClientException to my project, 
> but I still wonder why it works fine for Flink but fails for my project; in 
> theory, both are using the same dependencies, right?
> 
> On Wed, Oct 3, 2018 at 7:51 AM, Andrey Zagrebin <and...@data-artisans.com 
> <mailto:and...@data-artisans.com>> wrote:
> Hi Julio,
> 
> Looks like some problem with dependencies.
> Have you followed the recommended s3 configuration guide [1]?
> Is it correct that your job already created checkpoints/savepoints on s3 
> before?
> 
> I think if you manually create file system using FileSystem.get(path), it 
> should be configured the same way as for bucketing sink and checkpoints.
> 
> Best,
> Andrey
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/aws.html#s3-simple-storage-service
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/aws.html#s3-simple-storage-service>
> 
>> On 2 Oct 2018, at 15:21, Julio Biason <julio.bia...@azion.com 
>> <mailto:julio.bia...@azion.com>> wrote:
>> 
>> Hey guys,
>> 
>> I've setup a BucketingSink as a dead letter queue into our Ceph cluster 
>> using S3, but when I start the job, I get this error:
>> 
>> java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
>>      at java.lang.Class.forName0(Native Method)
>>      at java.lang.Class.forName(Class.java:348)
>>      at 
>> org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306)
>>      at 
>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271)
>>      at 
>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
>>      at 
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)
>>      at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295)
>>      at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
>>      at 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
>>      at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>      at 
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>      at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>      at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ClassNotFoundException: 
>> com.amazonaws.AmazonClientException
>>      at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>      at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>      at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>      at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>      ... 17 more
>> I find it weird 'cause I've already set up checkpoints (and savepoitns) to 
>> use S3 as protocol, and I just assume that, if it works for checkpoints, it 
>> should work here.
>> 
>> (I suppose I could add the aws client as a dependency of my build but, 
>> again, I assumed that once S3 works for checkpoints, it should work 
>> everywhere.)
>> 
>> And kinda related, can I assume that using the FileSystem class to create 
>> FSOutputStreams will follow Flink configuration? I have another type of dead 
>> letter queue that won't work with BucketingSink and I was thinking about 
>> using it directly to create files inside that Ceph/S3.
>> 
>> -- 
>> Julio Biason, Sofware Engineer
>> AZION  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51  
>> <callto:+5551996209291>99907 0554
> 
> 
> 
> 
> -- 
> Julio Biason, Sofware Engineer
> AZION  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51  
> <callto:+5551996209291>99907 0554

Reply via email to