You may have better luck with this question on the Spark Cassandra Connector mailing list.
One quick question about this code from your email: // Load DataFrame from C* data-source val base_data = base_data_df.getInstance(sqlContext) What exactly is base_data_df and how are you creating it? Mohammed Author: Big Data Analytics with Spark<http://www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/> -----Original Message----- From: bernh...@chapter7.ch [mailto:bernh...@chapter7.ch] Sent: Tuesday, February 9, 2016 6:58 AM To: user@spark.apache.org Subject: [Spark Streaming] Joining Kafka and Cassandra DataFrames All, I'm new to Spark and I'm having a hard time doing a simple join of two DFs Intent: - I'm receiving data from Kafka via direct stream and would like to enrich the messages with data from Cassandra. The Kafka messages (Protobufs) are decoded into DataFrames and then joined with a (supposedly pre-filtered) DF from Cassandra. The relation of (Kafka) streaming batch size to raw C* data is [several streaming messages to millions of C* rows], BUT the join always yields exactly ONE result [1:1] per message. After the join the resulting DF is eventually stored to another C* table. Problem: - Even though I'm joining the two DFs on the full Cassandra primary key and pushing the corresponding filter to C*, it seems that Spark is loading the whole C* data-set into memory before actually joining (which I'd like to prevent by using the filter/predicate pushdown). This leads to a lot of shuffling and tasks being spawned, hence the "simple" join takes forever... Could anyone shed some light on this? In my perception this should be a prime-example for DFs and Spark Streaming. Environment: - Spark 1.6 - Cassandra 2.1.12 - Cassandra-Spark-Connector 1.5-RC1 - Kafka 0.8.2.2 Code: def main(args: Array[String]) { val conf = new SparkConf() .setAppName("test") .set("spark.cassandra.connection.host", "xxx") .set("spark.cassandra.connection.keep_alive_ms", "30000") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(10)) ssc.sparkContext.setLogLevel("INFO") // Initialise Kafka val kafkaTopics = Set[String]("xxx") val kafkaParams = Map[String, String]( "metadata.broker.list" -> "xxx:32000,xxx:32000,xxx:32000,xxx:32000", "auto.offset.reset" -> "smallest") // Kafka stream val messages = KafkaUtils.createDirectStream[String, MyMsg, StringDecoder, MyMsgDecoder](ssc, kafkaParams, kafkaTopics) // Executed on the driver messages.foreachRDD { rdd => // Create an instance of SQLContext val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ // Map MyMsg RDD val MyMsgRdd = rdd.map{case (key, MyMsg) => (MyMsg)} // Convert RDD[MyMsg] to DataFrame val MyMsgDf = MyMsgRdd.toDF() .select( $"prim1Id" as 'prim1_id, $"prim2Id" as 'prim2_id, $... ) // Load DataFrame from C* data-source val base_data = base_data_df.getInstance(sqlContext) // Inner join on prim1Id and prim2Id val joinedDf = MyMsgDf.join(base_data, MyMsgDf("prim1_id") === base_data("prim1_id") && MyMsgDf("prim2_id") === base_data("prim2_id"), "left") .filter(base_data("prim1_id").isin(MyMsgDf("prim1_id")) && base_data("prim2_id").isin(MyMsgDf("prim2_id"))) joinedDf.show() joinedDf.printSchema() // Select relevant fields // Persist } // Start the computation ssc.start() ssc.awaitTermination() } SO: http://stackoverflow.com/questions/35295182/joining-kafka-and-cassandra-dataframes-in-spark-streaming-ignores-c-predicate-p --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>