Guys, In my local machine it consumes a stream of Kinesis with 3 shards. But in EC2 it does not consume from the stream. Later we found that the EC2 machine was of 2 cores and my local machine was of 4 cores. I am using a single machine and in spark standalone mode. And we got a larger machine from EC2 and now the kinesis is getting consumed.
4 cores Single machine -> works 2 cores Single machine -> does not work 2 cores 2 workers -> does not work So my question is that do we need a cluster of (#KinesisShards + 1) workers to be able to consume from Kinesis? A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources On Nov 27, 2014, at 10:28 AM, Aniket Bhatnagar <aniket.bhatna...@gmail.com> wrote: > Did you set spark master as local[*]? If so, then it means that nunber of > executors is equal to number of cores of the machine. Perhaps your mac > machine has more cores (certainly more than number of kinesis shards +1). > > Try explicitly setting master as local[N] where N is number of kinesis shards > + 1. It should then work on both the machines. > > On Thu, Nov 27, 2014, 9:46 AM Ashrafuzzaman <ashrafuzzaman...@gmail.com> > wrote: > I was trying in one machine with just sbt run. > > And it is working with my mac environment with the same configuration. > > I used the sample code from > https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala > > > val kinesisClient = new AmazonKinesisClient(new > DefaultAWSCredentialsProviderChain()) > kinesisClient.setEndpoint(endpointUrl) > val numShards = > kinesisClient.describeStream(streamName).getStreamDescription().getShards() > .size() > > /* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream > for each shard. */ > val numStreams = numShards > > /* Setup the and SparkConfig and StreamingContext */ > /* Spark Streaming batch interval */ > val batchInterval = Milliseconds(2000) > val sparkConfig = new SparkConf().setAppName("KinesisWordCount") > val ssc = new StreamingContext(sparkConfig, batchInterval) > > /* Kinesis checkpoint interval. Same as batchInterval for this example. */ > val kinesisCheckpointInterval = batchInterval > > /* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's > shards */ > val kinesisStreams = (0 until numStreams).map { i => > KinesisUtils.createStream(ssc, streamName, endpointUrl, > kinesisCheckpointInterval, > InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) > } > > /* Union all the streams */ > val unionStreams = ssc.union(kinesisStreams) > > /* Convert each line of Array[Byte] to String, split into words, and count > them */ > val words = unionStreams.flatMap(byteArray => new String(byteArray) > .split(" ")) > > /* Map each word to a (word, 1) tuple so we can reduce/aggregate by key. */ > val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) > > /* Print the first 10 wordCounts */ > wordCounts.print() > > /* Start the streaming context and await termination */ > ssc.start() > ssc.awaitTermination() > > > > A.K.M. Ashrafuzzaman > Lead Software Engineer > NewsCred > > (M) 880-175-5592433 > Twitter | Blog | Facebook > > Check out The Academy, your #1 source > for free content marketing resources > > On Wed, Nov 26, 2014 at 11:26 PM, Aniket Bhatnagar > <aniket.bhatna...@gmail.com> wrote: > What's your cluster size? For streamig to work, it needs shards + 1 executors. > > On Wed, Nov 26, 2014, 5:53 PM A.K.M. Ashrafuzzaman > <ashrafuzzaman...@gmail.com> wrote: > Hi guys, > When we are using Kinesis with 1 shard then it works fine. But when we use > more that 1 then it falls into an infinite loop and no data is processed by > the spark streaming. In the kinesis dynamo DB, I can see that it keeps > increasing the leaseCounter. But it do start processing. > > I am using, > scala: 2.10.4 > java version: 1.8.0_25 > Spark: 1.1.0 > spark-streaming-kinesis-asl: 1.1.0 > > A.K.M. Ashrafuzzaman > Lead Software Engineer > NewsCred > > (M) 880-175-5592433 > Twitter | Blog | Facebook > > Check out The Academy, your #1 source > for free content marketing resources > >