Probably, he referred to the word-couting example in kinesis here: https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L114
On Fri, Jan 27, 2017 at 6:41 PM, ayan guha <guha.a...@gmail.com> wrote: > Maybe a naive question: why are you creating 1 Dstream per shard? It > should be one Dstream corresponding to kinesis stream, isn't it? > > On Fri, Jan 27, 2017 at 8:09 PM, Takeshi Yamamuro <linguin....@gmail.com> > wrote: > >> Hi, >> >> Just a guess though, Kinesis shards sometimes have skew data. >> So, before you compute something from kinesis RDDs, you'd be better to >> repartition them >> for better parallelism. >> >> // maropu >> >> On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark <grcl...@gmail.com> wrote: >> >>> Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera) >>> to read information from Kinesis and write it to HDFS in parquet format. >>> The write seems very slow, and if I understood Spark's diagnostics >>> correctly, always seemed to run from the same executor, one partition after >>> the other, serially. So I stripped the program down to this: >>> >>> >>> val kinesisStreams = (0 until numShards).map { i => { >>> >>> KinesisUtils.createStream(streamingContext, sparkApplicationName, >>> >>> kinesisStreamName, kinesisUrl, awsRegion, >>> InitialPositionInStream.LATEST) >>> >>> new Duration(streamingInterval.millis), >>> StorageLevel.MEMORY_AND_DISK_SER, >>> >>> awsCredentials.accessKey, awsCredentials.secretKey) >>> >>> }} >>> >>> val allKinesisStreams = streamingContext.union(kinesisStreams) >>> >>> allKinesisStreams.foreachRDD { >>> >>> rdd => { >>> >>> info("total for this batch is " + rdd.count()) >>> >>> } >>> } >>> >>> The Kinesis stream has 20 shards (overprovisioned for this small test). >>> I confirmed using a small boto program that data is periodically written to >>> all 20 of the shards. I can see that Spark has created 20 executors, one >>> for each Kinesis shard. It also creates one other executor, tied to a >>> particular worker node, and that node seems to do the RDD counting. The >>> streaming interval is 1 minute, during which time several shards have >>> received data. Each minute interval, for this particular example, the >>> driver prints out between 20 and 30 for the count value. I expected to see >>> the count operation parallelized across the cluster. I think I must just be >>> misunderstanding something fundamental! Can anyone point out where I'm >>> going wrong? >>> >>> Yours in confusion, >>> Graham >>> >>> >> >> >> -- >> --- >> Takeshi Yamamuro >> > > > > -- > Best Regards, > Ayan Guha > -- --- Takeshi Yamamuro