Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain
performance right away. Because of this, I would not use a transform, just
directly the map.
I have not used python but in Scala the cassandra-spark connector can save
directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
wrote:

> Hi all,
>
> I am implementing a use case where I read some sensor data from Kafka with
> SparkStreaming interface (*KafkaUtils.createDirectStream*) and, after
> some transformations, write the output (RDD) to Cassandra.
>
> Everything is working properly but I am having some trouble with the
> performance. My kafka topic receives around 2000 messages per second. For a
> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to
> Cassandra, which is not acceptable for longer runs.
>
> I am running this application in a "sandbox" with 12GB of RAM, 2 cores and
> 30GB SSD space.
> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).
>
> I would like to know you have some suggestion to improve performance
> (other than getting more resources :) ).
>
> My code (pyspark) is posted in the end of this email so you can take a
> look. I tried some different cassandra configurations following this link:
> http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark
> (recommended in stackoverflow for similar questions).
>
>
> Thank you in advance,
>
> Best Regards,
> Saulo
>
>
>
> =============== # CODE # =================================
> ####
> # run command:
> # spark2-submit --packages
> org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
>  --conf spark.cassandra.connection.host='localhost' --num-executors 2
> --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
> ##
>
> # Run Spark imports
> from pyspark import SparkConf # SparkContext, SparkConf
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kafka import KafkaUtils
>
> # Run Cassandra imports
> import pyspark_cassandra
> from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>
> def recordHandler(record):
>     (mid, tt, in_tt, sid, mv) = parseData( record )
>     return processMetrics(mid, tt, in_tt, sid, mv)
>
> def process(time, rdd):
>     rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
>     if rdd2.count() > 0:
>         return rdd2
>
> def casssave(time, rdd):
>     rdd.saveToCassandra( "test_hdpkns", "measurement" )
>
> # ...
> brokers, topic = sys.argv[1:]
>
> # ...
>
> sconf = SparkConf() \
>         .setAppName("SensorDataStreamHandler") \
>         .setMaster("local[*]") \
>         .set("spark.default.parallelism", "2")
>
> sc = CassandraSparkContext(conf = sconf)
> batchIntervalSeconds = 2
> ssc = StreamingContext(sc, batchIntervalSeconds)
>
> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
> {"metadata.broker.list": brokers})
>
> kafkaStream \
>     .transform(process) \
>     .foreachRDD(casssave)
>
> ssc.start()
> ssc.awaitTermination()
>
> ================================================
>
>
>
>
>

Reply via email to