I have the hadoop-common.jar in my build.sbt because I was having issues compiling my jar after moving from 1.3.2 to 1.4.0 because org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use them in my custom bucketer and to writer to write Avro out to Parquet.
I tried adding classloader.resolve-order: parent-first to my flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop" and found the following: org/apache/hadoop/* org/apache/parquet/hadoop/* after designating hadoop-common.jar dependency as "provided" only org/apache/parquet/hadoop/* files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error doesn't show up anymore just the following: java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink. at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/ at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) ... 9 more Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear to help. On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek <aljos...@apache.org> wrote: > I think this might be happening because partial Hadoop dependencies are in > the user jar and the rest is only available from the Hadoop deps that come > bundled with Flink. For example, I noticed that you have Hadoop-common as a > dependency which probably ends up in your Jar. > > > On 4. Jan 2018, at 11:40, Stephan Ewen <se...@apache.org> wrote: > > Hi! > > This looks indeed like a class-loading issue - it looks like "RpcEngine" > and "ProtobufRpcEngine" are loaded via different classloaders. > > Can you try the following: > > - In your flink-conf.yml, set classloader.resolve-order: parent-first > > If that fixes the issue, then we can look at a way to make this seamless... > > On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <hamlin...@gmail.com> wrote: > >> Hello, >> >> After moving to Flink 1.4.0 I'm getting the following error. I can't find >> anything online that addresses it. Is it a Hadoop dependency issue? Here >> are my project dependencies: >> >> libraryDependencies ++= Seq( >> "org.apache.flink" %% "flink-scala" % flinkVersion % Provided, >> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, >> "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion, >> "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion, >> "org.apache.flink" % "flink-metrics-core" % flinkVersion, >> "org.apache.flink" % "flink-metrics-graphite" % flinkVersion, >> "org.apache.kafka" %% "kafka" % "0.10.0.1", >> "org.apache.avro" % "avro" % "1.7.7", >> "org.apache.parquet" % "parquet-hadoop" % "1.8.1", >> "org.apache.parquet" % "parquet-avro" % "1.8.1", >> "io.confluent" % "kafka-avro-serializer" % "3.2.0", >> "org.apache.hadoop" % "hadoop-common" % "3.0.0" >> ) >> >> *Stacktrace:* >> Cluster configuration: Standalone cluster with JobManager at localhost/ >> 127.0.0.1:6123 >> Using address localhost:6123 to connect to JobManager. >> JobManager web interface address http://localhost:8082 >> Starting execution of program >> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for >> job completion. >> Connected to JobManager at >> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] >> with leader session id 00000000-0000-0000-0000-000000000000. >> 01/03/2018 14:20:52 Job execution switched to status RUNNING. >> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED >> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING >> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING >> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED >> java.lang.RuntimeException: Error while creating FileSystem when >> initializing the state of the BucketingSink. >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.io.IOException: Cannot instantiate file system for URI: >> hdfs://localhost:12345/ >> at >> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187) >> at >> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) >> at >> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) >> ... 9 more >> Caused by: java.lang.ClassCastException: >> org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to >> org.apache.hadoop.ipc.RpcEngine >> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207) >> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579) >> at >> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418) >> at >> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314) >> at >> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176) >> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678) >> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619) >> at >> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149) >> at >> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159) >> ... 13 more >> > > >