I am seeing a similar issue when reading from Kafka. I have a single Kafka broker with 1 topic and 10 partitions on a separate machine. I have a three-node spark cluster, and verified that all workers are registered with the master. I'm initializing Kafka using a similar method to this article: http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/. I create 3 InputDStreams and union them together to provide a unified context. I then repartition this to 6 partitions:
val streams = Range(0, configuration.numStreams) .map(x => { logger.info("Starting Setup of Kafka Stream #" + x + ": \n\tZookeepers: " + zookeepersToUse.mkString(",") + "\n\tBrokers: " + brokersToUse.mkString(",") + "\n\tTopics: " + topicsToUse.mkString(",")) KafkaStreamFactory.createKafkaStream(ssc, brokersToUse, zookeepersToUse, topicsToUse) }).toArray val unionStream = ssc.union(streams) if(configuration.threadParallelism > 0) { unionStream.repartition(configuration.threadParallelism) } unionStream I am submitting the job to Spark using the following options: /spark/spark-1.4.1/bin/spark-submit --deploy-mode client --supervise --master "spark://sparkserver:7077" --conf spark.logConf=true --conf spark.default.parallelism=6 --conf spark.streaming.receiver.maxRate=500 --class MainClass "/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar" --checkpoint /tmp/sparkcheckpoint --broker kafkaBroker:9092 --topic test --numStreams 9 --threadParallelism 9 Even when I put a long-running job in the queue, none of the other nodes are anything but idle. Am I missing something obvious? Regards, Bryan Jeffrey On Fri, Sep 25, 2015 at 8:28 AM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Parallel tasks totally depends on the # of partitions that you are having, > if you are not receiving sufficient partitions (partitions > total # cores) > then try to do a .repartition. > > Thanks > Best Regards > > On Fri, Sep 25, 2015 at 1:44 PM, N B <nb.nos...@gmail.com> wrote: > >> Hello all, >> >> I have a Spark streaming application that reads from a Flume Stream, does >> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join >> operations before writing the analyzed output to ElasticSearch inside a >> foreachRDD()... >> >> I recently started to run this on a 2 node cluster (Standalone) with the >> driver program directly submitting to Spark master on the same host. The >> way I have divided the resources is as follows: >> >> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each >> worker) >> N2: 2 spark workers (16 gb + 8 cores each worker). >> >> The application works just fine but it is underusing N2 completely. It >> seems to use N1 (note that both executors on N1 get used) for all the >> analytics but when it comes to writing to Elasticsearch, it does divide the >> data around into all 4 executors which then write to ES on a separate host. >> >> I am puzzled as to why the data is not being distributed evenly from the >> get go into all 4 executors and why would it only do so in the final step >> of the pipeline which seems counterproductive as well? >> >> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity. >> >> Any help in getting the resources more evenly utilized on N1 and N2 is >> welcome. >> >> Thanks in advance, >> Nikunj >> >> >