What are passing as parameters to Spark-submit?

${SPARK_HOME}/bin/spark-submit \
                --executor-cores=12 \

Also check

http://spark.apache.org/docs/latest/configuration.html

Execution Behavior/spark.executor.cores


HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 2 June 2016 at 17:29, Andres M Jimenez T <ad...@hotmail.com> wrote:

> Hi,
>
>
> I am working with Spark 1.6.1, using kafka direct connect for streaming
> data.
>
> Using spark scheduler and 3 slaves.
>
> Kafka topic is partitioned with a value of 10.
>
>
> The problem i have is, there is only one thread per executor running my
> function (logic implementation).
>
>
> Can anybody tell me how can i increase threads per executor to get better
> use of CPUs?
>
>
> Thanks
>
>
> Here is the code i have implemented:
>
>
> *Driver*:
>
>
> JavaStreamingContext ssc = new JavaStreamingContext(conf, new
> Duration(10000));
>
> //prepare streaming from kafka
>
> Set<String> topicsSet = new
> HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(",")));
>
> Map<String, String> kafkaParams = new HashMap<>();
>
> kafkaParams.put("metadata.broker.list", kafkaBrokers);
>
> kafkaParams.put("group.id", SparkStreamingImpl.class.getName());
>
>
> JavaPairInputDStream<String, String> inputMessages =
> KafkaUtils.createDirectStream(
>
> ssc,
>
> String.class,
>
> String.class,
>
> StringDecoder.class,
>
> StringDecoder.class,
>
> kafkaParams,
>
> topicsSet
>
> );
>
>
> inputMessages.foreachRDD(new ForeachRDDFunction());
>
>
> *ForeachFunction*:
>
>
> class ForeachFunction implements VoidFunction<Tuple2<String, String>> {
>
> private static final Counter foreachConcurrent =
> ProcessingMetrics.metrics.counter( "foreach-concurrency" );
>
> public ForeachFunction() {
>
> LOG.info("Creating a new ForeachFunction");
>
> }
>
>
> public void call(Tuple2<String, String> t) throws Exception {
>
> foreachConcurrent.inc();
>
> LOG.info("processing message [" + t._1() + "]");
>
> try {
>
> Thread.sleep(1000);
>
> } catch (Exception e) { }
>
> foreachConcurrent.dec();
>
> }
>
> }
>
>
> *ForeachRDDFunction*:
>
>
> class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String,
> String>> {
>
> private static final Counter foreachRDDConcurrent =
> ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" );
>
> private ForeachFunction foreachFunction = new ForeachFunction();
>
> public ForeachRDDFunction() {
>
> LOG.info("Creating a new ForeachRDDFunction");
>
> }
>
>
> public void call(JavaPairRDD<String, String> t) throws Exception {
>
> foreachRDDConcurrent.inc();
>
> LOG.info("call from inputMessages.foreachRDD with [" +
> t.partitions().size() + "] partitions");
>
> for (Partition p : t.partitions()) {
>
> if (p instanceof KafkaRDDPartition){
>
> LOG.info("partition [" + p.index() + "] with count [" +
> ((KafkaRDDPartition) p).count() + "]");
>
> }
>
> }
>
> t.foreachAsync(foreachFunction);
>
> foreachRDDConcurrent.dec();
>
> }
>
> }
>
>
> *The log from driver that tells me my RDD is partitioned to process in
> parallel*:
>
>
> [Stage 70:>  (3 + 3) / 20][Stage 71:>  (0 + 0) / 20][Stage 72:>  (0 + 0) /
> 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from
> inputMessages.foreachRDD with [20] partitions
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15]
>
> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0]
>
>
> *The log from one of executors showing exactly one message per second was
> processed (only by one thread)*:
>
>
> 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message
> [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8]
>
> 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message
> [e267cde2-ffea-4f7a-9934-f32a3b7218cc]
>
> 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message
> [f055fe3c-0f72-4f41-9a31-df544f1e1cd3]
>
> 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message
> [854faaa5-0abe-49a2-b13a-c290a3720b0e]
>
> 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message
> [1bc0a141-b910-45fe-9881-e2066928fbc6]
>
> 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message
> [67fb99c6-1ca1-4dfb-bffe-43b927fdec07]
>
> 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message
> [de7d5934-bab2-4019-917e-c339d864ba18]
>
> 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message
> [e63d7a7e-de32-4527-b8f1-641cfcc8869c]
>
> 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message
> [1ce931ee-b8b1-4645-8a51-2c697bf1513b]
>
> 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message
> [5367f3c1-d66c-4647-bb44-f5eab719031d]
>
>

Reply via email to