Hi all,

More good news! I was able to utilize mergeStrategy to assembly my Kinesis
consumer into an "uber jar"

Here's what I added to* build.sbt:*

*mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>*
*  {*
*  case PathList("com", "esotericsoftware", "minlog", xs @ _*) =>
MergeStrategy.first*
*  case PathList("com", "google", "common", "base", xs @ _*) =>
MergeStrategy.first*
*  case PathList("org", "apache", "commons", xs @ _*) => MergeStrategy.last*
*  case PathList("org", "apache", "hadoop", xs @ _*) => MergeStrategy.first*
*  case PathList("org", "apache", "spark", "unused", xs @ _*) =>
MergeStrategy.first*
*        case x => old(x)*
*  }*
*}*

Everything appears to be working fine. Right now my producer is pushing
simple strings through Kinesis,
which my consumer is trying to print (using Spark's print() method for now).

However, instead of displaying my strings, I get the following:

*15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches
ArrayBuffer(1428173848000 ms)*

Any idea on what might be going on?

Thanks,

Vadim

Here's my consumer code (adapted from the WordCount example):























































































*private object MyConsumer extends Logging {  def main(args: Array[String])
{    /* Check that all required args were passed in. */    if (args.length
< 2) {      System.err.println(        """          |Usage:
KinesisWordCount <stream-name> <endpoint-url>          |    <stream-name>
is the name of the Kinesis stream          |    <endpoint-url> is the
endpoint of the Kinesis service          |                   (e.g.
https://kinesis.us-east-1.amazonaws.com
<https://kinesis.us-east-1.amazonaws.com>)        """.stripMargin)
System.exit(1)    }    /* Populate the appropriate variables from the given
args */    val Array(streamName, endpointUrl) = args    /* Determine the
number of shards from the stream */    val kinesisClient = new
AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpointUrl)    val numShards =
kinesisClient.describeStream(streamName).getStreamDescription().getShards()
    .size()    System.out.println("Num shards: " + numShards)    /* In this
example, we're going to create 1 Kinesis Worker/Receiver/DStream for each
shard. */    val numStreams = numShards    /* Setup the and SparkConfig and
StreamingContext */    /* Spark Streaming batch interval */    val
batchInterval = Milliseconds(2000)    val sparkConfig = new
SparkConf().setAppName("MyConsumer")    val ssc = new
StreamingContext(sparkConfig, batchInterval)    /* Kinesis checkpoint
interval.  Same as batchInterval for this example. */    val
kinesisCheckpointInterval = batchInterval    /* Create the same number of
Kinesis DStreams/Receivers as Kinesis stream's shards */    val
kinesisStreams = (0 until numStreams).map { i =>
KinesisUtils.createStream(ssc, streamName, endpointUrl,
kinesisCheckpointInterval,          InitialPositionInStream.LATEST,
StorageLevel.MEMORY_AND_DISK_2)    }    /* Union all the streams */    val
unionStreams  = ssc.union(kinesisStreams).map(byteArray => new
String(byteArray))    unionStreams.print()    ssc.start()
ssc.awaitTermination()  }}*

ᐧ

On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <t...@databricks.com> wrote:

> Just remove "provided" for spark-streaming-kinesis-asl
>
> libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl"
> % "1.3.0"
>
> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy <
> vadim.bichuts...@gmail.com> wrote:
>
>> Thanks. So how do I fix it?
>> ᐧ
>>
>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonat...@amazon.com>
>> wrote:
>>
>>>   spark-streaming-kinesis-asl is not part of the Spark distribution on
>>> your cluster, so you cannot have it be just a "provided" dependency.  This
>>> is also why the KCL and its dependencies were not included in the assembly
>>> (but yes, they should be).
>>>
>>>
>>>  ~ Jonathan Kelly
>>>
>>>   From: Vadim Bichutskiy <vadim.bichuts...@gmail.com>
>>> Date: Friday, April 3, 2015 at 12:26 PM
>>> To: Jonathan Kelly <jonat...@amazon.com>
>>> Cc: "user@spark.apache.org" <user@spark.apache.org>
>>> Subject: Re: Spark + Kinesis
>>>
>>>   Hi all,
>>>
>>>  Good news! I was able to create a Kinesis consumer and assemble it
>>> into an "uber jar" following
>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>>> and example
>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
>>> .
>>>
>>>  However when I try to spark-submit it I get the following exception:
>>>
>>>  *Exception in thread "main" java.lang.NoClassDefFoundError:
>>> com/amazonaws/auth/AWSCredentialsProvider*
>>>
>>>  Do I need to include KCL dependency in *build.sbt*, here's what it
>>> looks like currently:
>>>
>>>  import AssemblyKeys._
>>> name := "Kinesis Consumer"
>>> version := "1.0"
>>> organization := "com.myconsumer"
>>> scalaVersion := "2.11.5"
>>>
>>>  libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" %
>>> "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.3.0"
>>> % "provided"
>>> libraryDependencies += "org.apache.spark" %%
>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided"
>>>
>>>  assemblySettings
>>> jarName in assembly :=  "consumer-assembly.jar"
>>> assemblyOption in assembly := (assemblyOption in
>>> assembly).value.copy(includeScala=false)
>>>
>>>  Any help appreciated.
>>>
>>>  Thanks,
>>> Vadim
>>>
>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <jonat...@amazon.com>
>>> wrote:
>>>
>>>>  It looks like you're attempting to mix Scala versions, so that's
>>>> going to cause some problems.  If you really want to use Scala 2.11.5, you
>>>> must also use Spark package versions built for Scala 2.11 rather than
>>>> 2.10.  Anyway, that's not quite the correct way to specify Scala
>>>> dependencies in build.sbt.  Instead of placing the Scala version after the
>>>> artifactId (like "spark-core_2.10"), what you actually want is to use just
>>>> "spark-core" with two percent signs before it.  Using two percent signs
>>>> will make it use the version of Scala that matches your declared
>>>> scalaVersion.  For example:
>>>>
>>>>  libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" %
>>>> "provided"
>>>>
>>>>  libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>>>> "1.3.0" % "provided"
>>>>
>>>>  libraryDependencies += "org.apache.spark" %%
>>>> "spark-streaming-kinesis-asl" % "1.3.0"
>>>>
>>>>  I think that may get you a little closer, though I think you're
>>>> probably going to run into the same problems I ran into in this thread:
>>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html  I
>>>> never really got an answer for that, and I temporarily moved on to other
>>>> things for now.
>>>>
>>>>
>>>>  ~ Jonathan Kelly
>>>>
>>>>   From: 'Vadim Bichutskiy' <vadim.bichuts...@gmail.com>
>>>> Date: Thursday, April 2, 2015 at 9:53 AM
>>>> To: "user@spark.apache.org" <user@spark.apache.org>
>>>> Subject: Spark + Kinesis
>>>>
>>>>   Hi all,
>>>>
>>>>  I am trying to write an Amazon Kinesis consumer Scala app that
>>>> processes data in the
>>>> Kinesis stream. Is this the correct way to specify *build.sbt*:
>>>>
>>>>  -------
>>>> *import AssemblyKeys._*
>>>> *name := "Kinesis Consumer"*
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion :=
>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % "spark-core_2.10"
>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10" %
>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.3.0")*
>>>>
>>>>
>>>>
>>>> * assemblySettings jarName in assembly :=  "consumer-assembly.jar"
>>>> assemblyOption in assembly := (assemblyOption in
>>>> assembly).value.copy(includeScala=false)*
>>>> --------
>>>>
>>>>  In *project/assembly.sbt* I have only the following line:
>>>>
>>>>  *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")*
>>>>
>>>>  I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark
>>>> book.
>>>>
>>>>  Thanks,
>>>> Vadim
>>>>
>>>>
>>>
>>
>

Reply via email to