Hey guys,

I've setup a BucketingSink as a dead letter queue into our Ceph
cluster using S3, but when I start the job, I get this error:


java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at 
org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306)
        at 
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271)
        at 
org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367)
        at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432)
        at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 17 more

I find it weird 'cause I've already set up checkpoints (and savepoitns) to
use S3 as protocol, and I just assume that, if it works for checkpoints, it
should work here.

(I suppose I could add the aws client as a dependency of my build but,
again, I assumed that once S3 works for checkpoints, it should work
everywhere.)

And kinda related, can I assume that using the FileSystem class to create
FSOutputStreams will follow Flink configuration? I have another type of
dead letter queue that won't work with BucketingSink and I was thinking
about using it directly to create files inside that Ceph/S3.

-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101 <callto:+555130838101>  |  Mobile: +55 51
<callto:+5551996209291>*99907 0554*

Reply via email to