The code in HadoopRecoverableWriter is:
if (!"hdfs".equalsIgnoreCase(fs.getScheme()) ||
!HadoopUtils.isMinHadoopVersion(2, 7)) {
throw new UnsupportedOperationException(
"Recoverable writers on Hadoop are only
supported for HDFS and for Hadoop version 2.7 or newer");
}
So one possibility is that your sink path doesn’t have the explicit hdfs://xxx
protocol.
Another is that you’re in classpath hell, and your job jar contains an older
version of Hadoop jars.
— Ken
> On Jun 11, 2019, at 12:16 AM, Yitzchak Lieberman <[email protected]>
> wrote:
>
> Hi.
>
> I'm a bit confused:
> When launching my flink streaming application on EMR release 5.24 (which have
> flink 1.8 version) that write Kafka messages to s3 parquet files i'm getting
> the exception below, but when i'm installing flink 1.8 on EMR custom wise it
> works.
> What could be the difference behavior?
>
> Thanks,
> Yitzchak.
>
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on
> Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
> at
> org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra