Hello All, I am using Spark 2.3 version and i am trying to write Spark Streaming Join. It is a basic join and it is taking more time to join the stream data. I am not sure any configuration we need to set on Spark.
Code: ************************* import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.Trigger import org.apache.spark.sql.types.TimestampType object OrderSalesJoin { def main(args: Array[String]): Unit = { setEnvironmentVariables(args(0)) val order_topic = args(1) val invoice_topic = args(2) val dest_topic_name = args(3) val spark = SparkSession.builder().appName("SalesStreamingJoin").getOrCreate() val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name import spark.implicits._ val order_df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", KAFKA_BROKERS) .option("subscribe", order_topic) .option("startingOffsets", "latest") .option("failOnDataLoss", "false") .option("kafka.replica.fetch.max.bytes", "15728640") .load() val invoice_df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", KAFKA_BROKERS) .option("subscribe", invoice_topic) .option("startingOffsets", "latest") .option("failOnDataLoss", "false") .option("kafka.replica.fetch.max.bytes", "15728640") .load() val order_details = order_df .withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id")) .withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id")) .withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id")) .withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id")) .withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id")) .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id")) .withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost")) .withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost")) .withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold")) .withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost")) .withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order")) .withColumn("tstamp_trans", current_timestamp()) .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType)) .select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id", $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost", $"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost", $"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold") val invoice_details = invoice_df .withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id")) .withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status")) .where($"invoice_status" === "Success") .withColumn("tstamp_trans", current_timestamp()) .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType)) val order_wm = order_details.withWatermark("tstamp_trans", args(4)) val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5)) val join_df = order_wm .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id")) .select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id", $"s_warehouse_id", $"unit_cost", $"total_cost", $"promotion_cost", $"date_of_order", $"units_sold" as "units_sold", $"order_id") val final_ids = join_df .withColumn("value", to_json(struct($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id", $"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost", $"total_cost".cast("Int") as "total_cost", $"promotion_cost".cast("Int") as "promotion_cost", $"date_of_order", $"units_sold".cast("Int") as "units_sold", $"order_id"))) .dropDuplicates("order_id") .select("value") val write_df = final_ids .writeStream .format("kafka") .option("kafka.bootstrap.servers", KAFKA_BROKERS) .option("topic", dest_topic_name) .option("checkpointLocation", checkpoint_path) .trigger(Trigger.ProcessingTime("1 second")) .start() write_df.awaitTermination() } } **************************** Let me know, it is taking more than a minute for every run. The waiting time is keep on increasing as the data grows. Please let me know, any thing we need to configure to make it fast. I tried increase the parallesim. Executor: tried from <1 to 4> and memory i gave is 3GB. The data flow is very less. Even for the single data it is taking time.