Hi, Just a guess though, Kinesis shards sometimes have skew data. So, before you compute something from kinesis RDDs, you'd be better to repartition them for better parallelism.
// maropu On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark <grcl...@gmail.com> wrote: > Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera) to > read information from Kinesis and write it to HDFS in parquet format. The > write seems very slow, and if I understood Spark's diagnostics correctly, > always seemed to run from the same executor, one partition after the other, > serially. So I stripped the program down to this: > > > val kinesisStreams = (0 until numShards).map { i => { > > KinesisUtils.createStream(streamingContext, sparkApplicationName, > > kinesisStreamName, kinesisUrl, awsRegion, InitialPositionInStream. > LATEST) > > new Duration(streamingInterval.millis), StorageLevel.MEMORY_AND_DISK_ > SER, > > awsCredentials.accessKey, awsCredentials.secretKey) > > }} > > val allKinesisStreams = streamingContext.union(kinesisStreams) > > allKinesisStreams.foreachRDD { > > rdd => { > > info("total for this batch is " + rdd.count()) > > } > } > > The Kinesis stream has 20 shards (overprovisioned for this small test). I > confirmed using a small boto program that data is periodically written to > all 20 of the shards. I can see that Spark has created 20 executors, one > for each Kinesis shard. It also creates one other executor, tied to a > particular worker node, and that node seems to do the RDD counting. The > streaming interval is 1 minute, during which time several shards have > received data. Each minute interval, for this particular example, the > driver prints out between 20 and 30 for the count value. I expected to see > the count operation parallelized across the cluster. I think I must just be > misunderstanding something fundamental! Can anyone point out where I'm > going wrong? > > Yours in confusion, > Graham > > -- --- Takeshi Yamamuro