Hi all, More good news! I was able to utilize mergeStrategy to assembly my Kinesis consumer into an "uber jar"
Here's what I added to* build.sbt:* *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>* * {* * case PathList("com", "esotericsoftware", "minlog", xs @ _*) => MergeStrategy.first* * case PathList("com", "google", "common", "base", xs @ _*) => MergeStrategy.first* * case PathList("org", "apache", "commons", xs @ _*) => MergeStrategy.last* * case PathList("org", "apache", "hadoop", xs @ _*) => MergeStrategy.first* * case PathList("org", "apache", "spark", "unused", xs @ _*) => MergeStrategy.first* * case x => old(x)* * }* *}* Everything appears to be working fine. Right now my producer is pushing simple strings through Kinesis, which my consumer is trying to print (using Spark's print() method for now). However, instead of displaying my strings, I get the following: *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428173848000 ms)* Any idea on what might be going on? Thanks, Vadim Here's my consumer code (adapted from the WordCount example): *private object MyConsumer extends Logging { def main(args: Array[String]) { /* Check that all required args were passed in. */ if (args.length < 2) { System.err.println( """ |Usage: KinesisWordCount <stream-name> <endpoint-url> | <stream-name> is the name of the Kinesis stream | <endpoint-url> is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com <https://kinesis.us-east-1.amazonaws.com>) """.stripMargin) System.exit(1) } /* Populate the appropriate variables from the given args */ val Array(streamName, endpointUrl) = args /* Determine the number of shards from the stream */ val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl) val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size() System.out.println("Num shards: " + numShards) /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */ val numStreams = numShards /* Setup the and SparkConfig and StreamingContext */ /* Spark Streaming batch interval */ val batchInterval = Milliseconds(2000) val sparkConfig = new SparkConf().setAppName("MyConsumer") val ssc = new StreamingContext(sparkConfig, batchInterval) /* Kinesis checkpoint interval. Same as batchInterval for this example. */ val kinesisCheckpointInterval = batchInterval /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */ val kinesisStreams = (0 until numStreams).map { i => KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ val unionStreams = ssc.union(kinesisStreams).map(byteArray => new String(byteArray)) unionStreams.print() ssc.start() ssc.awaitTermination() }}* ᐧ On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <t...@databricks.com> wrote: > Just remove "provided" for spark-streaming-kinesis-asl > > libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" > % "1.3.0" > > On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy < > vadim.bichuts...@gmail.com> wrote: > >> Thanks. So how do I fix it? >> ᐧ >> >> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonat...@amazon.com> >> wrote: >> >>> spark-streaming-kinesis-asl is not part of the Spark distribution on >>> your cluster, so you cannot have it be just a "provided" dependency. This >>> is also why the KCL and its dependencies were not included in the assembly >>> (but yes, they should be). >>> >>> >>> ~ Jonathan Kelly >>> >>> From: Vadim Bichutskiy <vadim.bichuts...@gmail.com> >>> Date: Friday, April 3, 2015 at 12:26 PM >>> To: Jonathan Kelly <jonat...@amazon.com> >>> Cc: "user@spark.apache.org" <user@spark.apache.org> >>> Subject: Re: Spark + Kinesis >>> >>> Hi all, >>> >>> Good news! I was able to create a Kinesis consumer and assemble it >>> into an "uber jar" following >>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html >>> and example >>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala >>> . >>> >>> However when I try to spark-submit it I get the following exception: >>> >>> *Exception in thread "main" java.lang.NoClassDefFoundError: >>> com/amazonaws/auth/AWSCredentialsProvider* >>> >>> Do I need to include KCL dependency in *build.sbt*, here's what it >>> looks like currently: >>> >>> import AssemblyKeys._ >>> name := "Kinesis Consumer" >>> version := "1.0" >>> organization := "com.myconsumer" >>> scalaVersion := "2.11.5" >>> >>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" % >>> "provided" >>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.3.0" >>> % "provided" >>> libraryDependencies += "org.apache.spark" %% >>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided" >>> >>> assemblySettings >>> jarName in assembly := "consumer-assembly.jar" >>> assemblyOption in assembly := (assemblyOption in >>> assembly).value.copy(includeScala=false) >>> >>> Any help appreciated. >>> >>> Thanks, >>> Vadim >>> >>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <jonat...@amazon.com> >>> wrote: >>> >>>> It looks like you're attempting to mix Scala versions, so that's >>>> going to cause some problems. If you really want to use Scala 2.11.5, you >>>> must also use Spark package versions built for Scala 2.11 rather than >>>> 2.10. Anyway, that's not quite the correct way to specify Scala >>>> dependencies in build.sbt. Instead of placing the Scala version after the >>>> artifactId (like "spark-core_2.10"), what you actually want is to use just >>>> "spark-core" with two percent signs before it. Using two percent signs >>>> will make it use the version of Scala that matches your declared >>>> scalaVersion. For example: >>>> >>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" % >>>> "provided" >>>> >>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % >>>> "1.3.0" % "provided" >>>> >>>> libraryDependencies += "org.apache.spark" %% >>>> "spark-streaming-kinesis-asl" % "1.3.0" >>>> >>>> I think that may get you a little closer, though I think you're >>>> probably going to run into the same problems I ran into in this thread: >>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html I >>>> never really got an answer for that, and I temporarily moved on to other >>>> things for now. >>>> >>>> >>>> ~ Jonathan Kelly >>>> >>>> From: 'Vadim Bichutskiy' <vadim.bichuts...@gmail.com> >>>> Date: Thursday, April 2, 2015 at 9:53 AM >>>> To: "user@spark.apache.org" <user@spark.apache.org> >>>> Subject: Spark + Kinesis >>>> >>>> Hi all, >>>> >>>> I am trying to write an Amazon Kinesis consumer Scala app that >>>> processes data in the >>>> Kinesis stream. Is this the correct way to specify *build.sbt*: >>>> >>>> ------- >>>> *import AssemblyKeys._* >>>> *name := "Kinesis Consumer"* >>>> >>>> >>>> >>>> >>>> >>>> >>>> *version := "1.0" organization := "com.myconsumer" scalaVersion := >>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % "spark-core_2.10" >>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10" % >>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.3.0")* >>>> >>>> >>>> >>>> * assemblySettings jarName in assembly := "consumer-assembly.jar" >>>> assemblyOption in assembly := (assemblyOption in >>>> assembly).value.copy(includeScala=false)* >>>> -------- >>>> >>>> In *project/assembly.sbt* I have only the following line: >>>> >>>> *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")* >>>> >>>> I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark >>>> book. >>>> >>>> Thanks, >>>> Vadim >>>> >>>> >>> >> >