Hi Saulo, I'm no expert but I will give it a try. I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map. I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD.
Finally I would use the spark UI to find which stage is the bottleneck here. On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <saulo.sobre...@outlook.pt> wrote: > Hi all, > > I am implementing a use case where I read some sensor data from Kafka with > SparkStreaming interface (*KafkaUtils.createDirectStream*) and, after > some transformations, write the output (RDD) to Cassandra. > > Everything is working properly but I am having some trouble with the > performance. My kafka topic receives around 2000 messages per second. For a > 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to > Cassandra, which is not acceptable for longer runs. > > I am running this application in a "sandbox" with 12GB of RAM, 2 cores and > 30GB SSD space. > Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1). > > I would like to know you have some suggestion to improve performance > (other than getting more resources :) ). > > My code (pyspark) is posted in the end of this email so you can take a > look. I tried some different cassandra configurations following this link: > http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark > (recommended in stackoverflow for similar questions). > > > Thank you in advance, > > Best Regards, > Saulo > > > > =============== # CODE # ================================= > #### > # run command: > # spark2-submit --packages > org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2 > --conf spark.cassandra.connection.host='localhost' --num-executors 2 > --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2 > ## > > # Run Spark imports > from pyspark import SparkConf # SparkContext, SparkConf > from pyspark.streaming import StreamingContext > from pyspark.streaming.kafka import KafkaUtils > > # Run Cassandra imports > import pyspark_cassandra > from pyspark_cassandra import CassandraSparkContext, saveToCassandra > > def recordHandler(record): > (mid, tt, in_tt, sid, mv) = parseData( record ) > return processMetrics(mid, tt, in_tt, sid, mv) > > def process(time, rdd): > rdd2 = rdd.map( lambda w: recordHandler(w[1]) ) > if rdd2.count() > 0: > return rdd2 > > def casssave(time, rdd): > rdd.saveToCassandra( "test_hdpkns", "measurement" ) > > # ... > brokers, topic = sys.argv[1:] > > # ... > > sconf = SparkConf() \ > .setAppName("SensorDataStreamHandler") \ > .setMaster("local[*]") \ > .set("spark.default.parallelism", "2") > > sc = CassandraSparkContext(conf = sconf) > batchIntervalSeconds = 2 > ssc = StreamingContext(sc, batchIntervalSeconds) > > kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], > {"metadata.broker.list": brokers}) > > kafkaStream \ > .transform(process) \ > .foreachRDD(casssave) > > ssc.start() > ssc.awaitTermination() > > ================================================ > > > > >