Hey guys, I've setup a BucketingSink as a dead letter queue into our Ceph cluster using S3, but when I start the job, I get this error:
java.lang.NoClassDefFoundError: com/amazonaws/AmazonClientException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2306) at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2271) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2367) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2793) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1295) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:432) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:376) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.amazonaws.AmazonClientException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 17 more I find it weird 'cause I've already set up checkpoints (and savepoitns) to use S3 as protocol, and I just assume that, if it works for checkpoints, it should work here. (I suppose I could add the aws client as a dependency of my build but, again, I assumed that once S3 works for checkpoints, it should work everywhere.) And kinda related, can I assume that using the FileSystem class to create FSOutputStreams will follow Flink configuration? I have another type of dead letter queue that won't work with BucketingSink and I was thinking about using it directly to create files inside that Ceph/S3. -- *Julio Biason*, Sofware Engineer *AZION* | Deliver. Accelerate. Protect. Office: +55 51 3083 8101 <callto:+555130838101> | Mobile: +55 51 <callto:+5551996209291>*99907 0554*