--executor-cores 1 to be exact.
Pozdrawiam, Jacek Laskowski ---- https://medium.com/@jaceklaskowski/ Mastering Apache Spark http://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Fri, Jun 3, 2016 at 12:28 AM, Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > interesting. a vm with one core! > > one simple test > > can you try running with > > --executor-cores=1 > > and see it works ok please > > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 2 June 2016 at 23:15, Andres M Jimenez T <ad...@hotmail.com> wrote: > >> Mich, thanks for your time, >> >> >> i am launching spark-submit as follows: >> >> >> bin/spark-submit --class com.example.SparkStreamingImpl --master >> spark://dev1.dev:7077 --verbose --driver-memory 1g --executor-memory 1g >> --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote >> -Dcom.sun.management.jmxremote.port=8090 >> -Dcom.sun.management.jmxremote.rmi.port=8091 >> -Dcom.sun.management.jmxremote.authenticate=false >> -Dcom.sun.management.jmxremote.ssl=false" --conf >> "spark.executor.extraJavaOptions=-Dcom.sun.management.jmxremote >> -Dcom.sun.management.jmxremote.port=8092 >> -Dcom.sun.management.jmxremote.rmi.port=8093 >> -Dcom.sun.management.jmxremote.authenticate=false >> -Dcom.sun.management.jmxremote.ssl=false" --conf >> "spark.scheduler.mode=FAIR" --conf /home/Processing.jar >> >> >> When i use --executor-cores=12 i get "Initial job has not accepted any >> resources; check your cluster UI to ensure that workers are registered and >> have sufficient resources". >> >> >> This, because my nodes are single core, but i want to use more than one >> thread per core, is this possible? >> >> >> root@dev1:/home/spark-1.6.1-bin-hadoop2.6# lscpu >> Architecture: x86_64 >> CPU op-mode(s): 32-bit, 64-bit >> Byte Order: Little Endian >> CPU(s): 1 >> On-line CPU(s) list: 0 >> Thread(s) per core: 1 >> Core(s) per socket: 1 >> Socket(s): 1 >> NUMA node(s): 1 >> Vendor ID: GenuineIntel >> CPU family: 6 >> Model: 58 >> Model name: Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz >> Stepping: 0 >> CPU MHz: 2999.999 >> BogoMIPS: 5999.99 >> Hypervisor vendor: VMware >> Virtualization type: full >> L1d cache: 32K >> L1i cache: 32K >> L2 cache: 256K >> L3 cache: 25600K >> NUMA node0 CPU(s): 0 >> >> >> Thanks >> >> >> >> ------------------------------ >> *From:* Mich Talebzadeh <mich.talebza...@gmail.com> >> *Sent:* Thursday, June 2, 2016 5:00 PM >> *To:* Andres M Jimenez T >> *Cc:* user@spark.apache.org >> *Subject:* Re: how to increase threads per executor >> >> What are passing as parameters to Spark-submit? >> >> >> ${SPARK_HOME}/bin/spark-submit \ >> --executor-cores=12 \ >> >> Also check >> >> http://spark.apache.org/docs/latest/configuration.html >> Configuration - Spark 1.6.1 Documentation >> <http://spark.apache.org/docs/latest/configuration.html> >> spark.apache.org >> Spark Configuration. Spark Properties. Dynamically Loading Spark >> Properties; Viewing Spark Properties; Available Properties. Application >> Properties; Runtime Environment >> >> >> Execution Behavior/spark.executor.cores >> >> >> HTH >> >> >> Dr Mich Talebzadeh >> >> >> >> LinkedIn * >> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >> >> >> >> http://talebzadehmich.wordpress.com >> >> >> >> On 2 June 2016 at 17:29, Andres M Jimenez T <ad...@hotmail.com> wrote: >> >>> Hi, >>> >>> >>> I am working with Spark 1.6.1, using kafka direct connect for streaming >>> data. >>> >>> Using spark scheduler and 3 slaves. >>> >>> Kafka topic is partitioned with a value of 10. >>> >>> >>> The problem i have is, there is only one thread per executor running my >>> function (logic implementation). >>> >>> >>> Can anybody tell me how can i increase threads per executor to get >>> better use of CPUs? >>> >>> >>> Thanks >>> >>> >>> Here is the code i have implemented: >>> >>> >>> *Driver*: >>> >>> >>> JavaStreamingContext ssc = new JavaStreamingContext(conf, new >>> Duration(10000)); >>> >>> //prepare streaming from kafka >>> >>> Set<String> topicsSet = new >>> HashSet<>(Arrays.asList("stage1-in,stage1-retry".split(","))); >>> >>> Map<String, String> kafkaParams = new HashMap<>(); >>> >>> kafkaParams.put("metadata.broker.list", kafkaBrokers); >>> >>> kafkaParams.put("group.id", SparkStreamingImpl.class.getName()); >>> >>> >>> JavaPairInputDStream<String, String> inputMessages = >>> KafkaUtils.createDirectStream( >>> >>> ssc, >>> >>> String.class, >>> >>> String.class, >>> >>> StringDecoder.class, >>> >>> StringDecoder.class, >>> >>> kafkaParams, >>> >>> topicsSet >>> >>> ); >>> >>> >>> inputMessages.foreachRDD(new ForeachRDDFunction()); >>> >>> >>> *ForeachFunction*: >>> >>> >>> class ForeachFunction implements VoidFunction<Tuple2<String, String>> { >>> >>> private static final Counter foreachConcurrent = >>> ProcessingMetrics.metrics.counter( "foreach-concurrency" ); >>> >>> public ForeachFunction() { >>> >>> LOG.info("Creating a new ForeachFunction"); >>> >>> } >>> >>> >>> public void call(Tuple2<String, String> t) throws Exception { >>> >>> foreachConcurrent.inc(); >>> >>> LOG.info("processing message [" + t._1() + "]"); >>> >>> try { >>> >>> Thread.sleep(1000); >>> >>> } catch (Exception e) { } >>> >>> foreachConcurrent.dec(); >>> >>> } >>> >>> } >>> >>> >>> *ForeachRDDFunction*: >>> >>> >>> class ForeachRDDFunction implements VoidFunction<JavaPairRDD<String, >>> String>> { >>> >>> private static final Counter foreachRDDConcurrent = >>> ProcessingMetrics.metrics.counter( "foreachRDD-concurrency" ); >>> >>> private ForeachFunction foreachFunction = new ForeachFunction(); >>> >>> public ForeachRDDFunction() { >>> >>> LOG.info("Creating a new ForeachRDDFunction"); >>> >>> } >>> >>> >>> public void call(JavaPairRDD<String, String> t) throws Exception { >>> >>> foreachRDDConcurrent.inc(); >>> >>> LOG.info("call from inputMessages.foreachRDD with [" + >>> t.partitions().size() + "] partitions"); >>> >>> for (Partition p : t.partitions()) { >>> >>> if (p instanceof KafkaRDDPartition){ >>> >>> LOG.info("partition [" + p.index() + "] with count [" + >>> ((KafkaRDDPartition) p).count() + "]"); >>> >>> } >>> >>> } >>> >>> t.foreachAsync(foreachFunction); >>> >>> foreachRDDConcurrent.dec(); >>> >>> } >>> >>> } >>> >>> >>> *The log from driver that tells me my RDD is partitioned to process in >>> parallel*: >>> >>> >>> [Stage 70:> (3 + 3) / 20][Stage 71:> (0 + 0) / 20][Stage 72:> (0 + 0) >>> / 20]16/06/02 08:32:10 INFO SparkStreamingImpl: call from >>> inputMessages.foreachRDD with [20] partitions >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [0] with count [24] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [1] with count [0] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [2] with count [0] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [3] with count [19] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [4] with count [19] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [5] with count [20] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [6] with count [0] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [7] with count [23] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [8] with count [21] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [9] with count [0] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [10] with count [0] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [11] with count [0] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [12] with count [0] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [13] with count [26] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [14] with count [0] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [15] with count [27] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [16] with count [0] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [17] with count [16] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [18] with count [15] >>> >>> 16/06/02 08:32:10 INFO SparkStreamingImpl: partition [19] with count [0] >>> >>> >>> *The log from one of executors showing exactly one message per second >>> was processed (only by one thread)*: >>> >>> >>> 16/06/02 08:32:46 INFO SparkStreamingImpl: processing message >>> [f2b22bb9-3bd8-4e5b-b9fb-afa7e8c4deb8] >>> >>> 16/06/02 08:32:47 INFO SparkStreamingImpl: processing message >>> [e267cde2-ffea-4f7a-9934-f32a3b7218cc] >>> >>> 16/06/02 08:32:48 INFO SparkStreamingImpl: processing message >>> [f055fe3c-0f72-4f41-9a31-df544f1e1cd3] >>> >>> 16/06/02 08:32:49 INFO SparkStreamingImpl: processing message >>> [854faaa5-0abe-49a2-b13a-c290a3720b0e] >>> >>> 16/06/02 08:32:50 INFO SparkStreamingImpl: processing message >>> [1bc0a141-b910-45fe-9881-e2066928fbc6] >>> >>> 16/06/02 08:32:51 INFO SparkStreamingImpl: processing message >>> [67fb99c6-1ca1-4dfb-bffe-43b927fdec07] >>> >>> 16/06/02 08:32:52 INFO SparkStreamingImpl: processing message >>> [de7d5934-bab2-4019-917e-c339d864ba18] >>> >>> 16/06/02 08:32:53 INFO SparkStreamingImpl: processing message >>> [e63d7a7e-de32-4527-b8f1-641cfcc8869c] >>> >>> 16/06/02 08:32:54 INFO SparkStreamingImpl: processing message >>> [1ce931ee-b8b1-4645-8a51-2c697bf1513b] >>> >>> 16/06/02 08:32:55 INFO SparkStreamingImpl: processing message >>> [5367f3c1-d66c-4647-bb44-f5eab719031d] >>> >>> >> >