I am seeing a similar issue when reading from Kafka.  I have a single Kafka
broker with 1 topic and 10 partitions on a separate machine.  I have a
three-node spark cluster, and verified that all workers are registered with
the master.  I'm initializing Kafka using a similar method to this article:
http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/.
I create 3 InputDStreams and union them together to provide a unified
context. I then repartition this to 6 partitions:

val streams = Range(0, configuration.numStreams)
  .map(x => {
 logger.info("Starting Setup of Kafka Stream #" + x + ": \n\tZookeepers: "
+ zookeepersToUse.mkString(",") + "\n\tBrokers: " +
brokersToUse.mkString(",") + "\n\tTopics: " + topicsToUse.mkString(","))
 KafkaStreamFactory.createKafkaStream(ssc, brokersToUse, zookeepersToUse,
topicsToUse)
}).toArray
val unionStream = ssc.union(streams)
if(configuration.threadParallelism > 0) {
  unionStream.repartition(configuration.threadParallelism)
}
unionStream


I am submitting the job to Spark using the following options:

/spark/spark-1.4.1/bin/spark-submit --deploy-mode client --supervise
--master "spark://sparkserver:7077" --conf spark.logConf=true --conf
spark.default.parallelism=6 --conf spark.streaming.receiver.maxRate=500
--class MainClass "/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
--checkpoint /tmp/sparkcheckpoint --broker kafkaBroker:9092 --topic test
--numStreams 9 --threadParallelism 9

Even when I put a long-running job in the queue, none of the other nodes
are anything but idle.

Am I missing something obvious?

Regards,

Bryan Jeffrey





On Fri, Sep 25, 2015 at 8:28 AM, Akhil Das <ak...@sigmoidanalytics.com>
wrote:

> Parallel tasks totally depends on the # of partitions that you are having,
> if you are not receiving sufficient partitions (partitions > total # cores)
> then try to do a .repartition.
>
> Thanks
> Best Regards
>
> On Fri, Sep 25, 2015 at 1:44 PM, N B <nb.nos...@gmail.com> wrote:
>
>> Hello all,
>>
>> I have a Spark streaming application that reads from a Flume Stream, does
>> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join
>> operations before writing the analyzed output to ElasticSearch inside a
>> foreachRDD()...
>>
>> I recently started to run this on a 2 node cluster (Standalone) with the
>> driver program directly submitting to Spark master on the same host. The
>> way I have divided the resources is as follows:
>>
>> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
>> worker)
>> N2: 2 spark workers (16 gb + 8 cores each worker).
>>
>> The application works just fine but it is underusing N2 completely. It
>> seems to use N1 (note that both executors on N1 get used) for all the
>> analytics but when it comes to writing to Elasticsearch, it does divide the
>> data around into all 4 executors which then write to ES on a separate host.
>>
>> I am puzzled as to why the data is not being distributed evenly from the
>> get go into all 4 executors and why would it only do so in the final step
>> of the pipeline which seems counterproductive as well?
>>
>> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity.
>>
>> Any help in getting the resources more evenly utilized on N1 and N2 is
>> welcome.
>>
>> Thanks in advance,
>> Nikunj
>>
>>
>

Reply via email to