Hi all - first time on the mailing list, so my apologies if I break protocol on anything. Really excited to be using Flink, and hoping to be active here in the future! Also, apologies for the length of this email - I tried to include details but may have gone overboard.
The gist of my problem is an issue with packaging the Flink Kinesis Connector into my user code for execution on a YARN cluster in EMR - there's some dependency trouble happening, but after about 48 hours of attempts, I'm not sure how to make progress, and I'd really appreciate any ideas or assistance. Thank you in advance! ### First, Some Context. We're hoping to write our Flink jobs in scala 2.11. The Flink JM/TMs currently run on an EMR cluster with Hadoop 2.7 as YARN containers. We run our jobs via an Azkaban server, which has the Hadoop and Flink clients installed, and the configurations are set to point at the YARN master on our EMR cluster (with $HADOOP_HOME set so Flink can discover the hadoop configs). We're using Java OpenJDK7 everywhere, and Maven 3.3.9 when building Flink from source. We use SBT and the assembly plugin to create an Uberjar of our code and its dependencies. This gets uploaded to Azkaban, whereupon the following command is run on the azkaban server to execute a Flink job: flink run -c <className> usercodeuberjar-assembly-1.0.jar I've successfully run a few flink jobs that execute on our EMR cluster in this fashion (the WordCount example, etc.). ### The Problem We use AWS Kinesis, and are hoping to integrate Flink with it. Naturally, we were hoping to use the Kinesis connector: < https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kinesis.html >. After following the instructions with some experimentation, I was able to run a Flink Kinesis application on my laptop in Local Cluster mode. (Ubuntu 16.04, local cluster initiated with the `./start-local.sh` command, job submitted via `flink run -c <className> usercodeuberjar-assembly-1.0.jar`) I uploaded the same JAR to Azkaban and tried to run the same command to submit to our EMR cluster, and got a `java.lang.NoSuchMethodError: com.amazonaws.SDKGlobalConfiguration.isInRegionOptimizedModeEnabled()` (I've included the full stack trace at the bottom of this email). I went to inspect the uploaded JAR with a `unzip usercodeuberjar-assembly-1.0.jar`, looked in `com/amazonaws` and found the SDKGlobalConfiguration.class file. I decompiled and inspected it, and the isInRegionOptimizedModeEnabled method that was purportedly missing was indeed present. I've included the steps I took to manifest this problem below, along with a variety of things that I tried to do to resolve the problem - any help or insight is greatly appreciated! ### Repro I'm not sure how to provide a clear repro, but I'll try to include as much detail as I can about the sequence of actions and commands I ran since there may be some obvious mistakes: Downloading the flink release to my laptop: wget http://www-us.apache.org/dist/flink/flink-1.1.3/flink-1.1.3-bin-hadoop27-scala_2.11.tgz tar xfzv flink-1.1.3-bin-hadoop27-scala_2.11.tgz I then SSH'd into Azkaban, and ran the same two commands, while adding the bin/ directory to my PATH and tweaking the config for fs.hdfs.hadoopconf. Next, after getting the flink binaries, I went to fetch the source code in order to follow the instructions here: < https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kinesis.html > wget https://github.com/apache/flink/archive/release-1.1.3.tar.gz tar xfzv release-1.1.3.tar.gz Here, I wanted to leverage our EMR instance profile Role instead of passing in credentials, hence I wanted the AUTO value for the "aws.credentials.provider" config, which seems to have been added after 1.1.3 - I made a couple of small tweaks to AWSConfigConstants.java and AWSUtil.java to allow for that AUTO value. Next, we're using Scala 2.11, so per the instructions here, I changed the scala version: < https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/building.html#scala-versions > tools/change-scala-version.sh 2.11 Back to the Kinesis Connector documentation... mvn clean install -Pinclude-kinesis -DskipTests cd flink-dist mvn clean install -Pinclude-kinesis -DskipTests When running that second mvn clean install, I get some warnings about the maven shade plugin having conflicting versions. I also get a "[WARNING] The requested profile "include-kinesis" could not be activated because it does not exist." At this point, the instructions are not too clear on what to do. I proceed to this section to try and figure it out: < https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution > My goal is to package everything in my usercode JAR, and I'll try to do that with SBT. My first try is to install the Flink Kinesis Connector JAR generated by mvn clean install to my local Maven Repo: mvn install:install-file -Dfile=flink-connector-kinesis_2.11-1.1.3.jar I then build the jar with a build.sbt that looks like this (extraneous details removed): scalaVersion in ThisBuild := "2.11.8" val flinkVersion = "1.1.3" val flinkDependencies = Seq( "org.apache.flink" %% "flink-scala" % flinkVersion, "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, "org.apache.flink" %% "flink-connector-kinesis" % flinkVersion ) lazy val proj = (project in file(".")). settings( libraryDependencies ++= flinkDependencies ) After this builds, I unzip the jar and use JD to decompile the com.amazonaws.SDKGlobalConfiguration class file to see if the method in question is present or not (it is). I then run the jar locally with a `flink run -c <className> usercodeuberjar-assembly-1.0.jar`, and I see it running just fine when navigating to localhost:8081. I then upload this same JAR to our Azkaban server, and run the same `flink run -c <className> usercodeuberjar-assembly-1.0.jar` command to submit as a YARN application - this time, I get the `NoSuchMethodError`. I've tried a variety of permutations of this, so I'll attempt to list them out along with their results: 1. A non-kinesis Flink job: I was able to successfully the example WordCount Flink job as a YARN application. 2. I mvn installed the newly built flink-scala and flink-streaming-scala JARs to my local maven repository in case these were different - after building and running on Azkaban... same error. 3. Using the newly-built flink-dist JAR (with the -Pinclude-kinesis flag): After replacing the flink-dist JAR in the /lib dir on Azkaban (that the `flink` command runs), I still had the same error. 4. Packaging the JAR in different ways: - I tried adding the flink-connector-kinesis JAR by adding it to a /lib directory in my SBT project for direct inclusion. This actually caused the NoSuchMethodError to occur during *local* execution as well. - I tried using mvn-assembly to package all of the flink-connector-kinesis dependencies into that JAR, and then added it to the /lib directory in my SBT project. Local execution no longer has error, but submission from Azkaban still has same error. 5. I thought it might be a classpath issue (since my laptop doesn't have a hadoop installation, so I figured there may be some kind of collision with the AWS SDK included by EMR), so I set, on Azkaban, the environment variable FLINK_CLASSPATH=usercodeuberjar-assembly-1.0.jar in order to get it prepended - same error. 6. I realized this wasn't necessarily doing anything to the resolution of classnames of the Flink job executing in YARN. So I dug into the client source, which eventually led me to flink-clients/.../program/PackagedProgram.java which has the following line of code setting the ClassLoader: this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader()); getAllLibraries() does seem to set the jar that you pass into the `flink` command at the top of the class resolution hierarchy, which, as my previous foray into decompilation shows, does seem to include the method that is supposedly missing. At this point, I ran out of ideas to investigate, and so I'm hoping someone here is able to help me. Thanks in advance for reading this! Full Stack Trace: java.lang.NoSuchMethodError: com.amazonaws.SDKGlobalConfiguration.isInRegionOptimizedModeEnabled()Z at com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:35) at org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.createKinesisClient(AWSUtil.java:50) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:118) at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.create(KinesisProxy.java:176) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.(KinesisDataFetcher.java:188) at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:198) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585) at java.lang.Thread.run(Thread.java:745)