Hi,

Just a guess though, Kinesis shards sometimes have skew data.
So, before you compute something from kinesis RDDs, you'd be better to
repartition them
for better parallelism.

// maropu

On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark <grcl...@gmail.com> wrote:

> Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera) to
> read information from Kinesis and write it to HDFS in parquet format. The
> write seems very slow, and if I understood Spark's diagnostics correctly,
> always seemed to run from the same executor, one partition after the other,
> serially. So I stripped the program down to this:
>
>
> val kinesisStreams = (0 until numShards).map { i => {
>
>   KinesisUtils.createStream(streamingContext, sparkApplicationName,
>
>     kinesisStreamName, kinesisUrl, awsRegion, InitialPositionInStream.
> LATEST)
>
>     new Duration(streamingInterval.millis), StorageLevel.MEMORY_AND_DISK_
> SER,
>
>     awsCredentials.accessKey, awsCredentials.secretKey)
>
> }}
>
> val allKinesisStreams = streamingContext.union(kinesisStreams)
>
> allKinesisStreams.foreachRDD {
>
>    rdd => {
>
>       info("total for this batch is " + rdd.count())
>
>    }
> }
>
> The Kinesis stream has 20 shards (overprovisioned for this small test). I
> confirmed using a small boto program that data is periodically written to
> all 20 of the shards. I can see that Spark has created 20 executors, one
> for each Kinesis shard. It also creates one other executor, tied to a
> particular worker node, and that node seems to do the RDD counting. The
> streaming interval is 1 minute, during which time several shards have
> received data. Each minute interval, for this particular example, the
> driver prints out between 20 and 30 for the count value. I expected to see
> the count operation parallelized across the cluster. I think I must just be
> misunderstanding something fundamental! Can anyone point out where I'm
> going wrong?
>
> Yours in confusion,
> Graham
>
>


-- 
---
Takeshi Yamamuro

Reply via email to