Hello All,

I am using Spark 2.3 version and i am trying to write Spark Streaming Join.
It is a basic join and it is taking more time to join the stream data. I am
not sure any configuration we need to set on Spark.

Code:
*************************
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType

object OrderSalesJoin {
  def main(args: Array[String]): Unit = {

    setEnvironmentVariables(args(0))

    val order_topic = args(1)
    val invoice_topic = args(2)
    val dest_topic_name = args(3)

    val spark =
SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()

    val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name

    import spark.implicits._


    val order_df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("subscribe", order_topic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("kafka.replica.fetch.max.bytes", "15728640")
      .load()


    val invoice_df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("subscribe", invoice_topic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("kafka.replica.fetch.max.bytes", "15728640")
      .load()


    val order_details = order_df
      .withColumn("s_order_id", get_json_object($"value".cast("String"),
"$.order_id"))
      .withColumn("s_customer_id", get_json_object($"value".cast("String"),
"$.customer_id"))
      .withColumn("s_promotion_id",
get_json_object($"value".cast("String"), "$.promotion_id"))
      .withColumn("s_store_id", get_json_object($"value".cast("String"),
"$.store_id"))
      .withColumn("s_product_id", get_json_object($"value".cast("String"),
"$.product_id"))
      .withColumn("s_warehouse_id",
get_json_object($"value".cast("String"), "$.warehouse_id"))
      .withColumn("unit_cost", get_json_object($"value".cast("String"),
"$.unit_cost"))
      .withColumn("total_cost", get_json_object($"value".cast("String"),
"$.total_cost"))
      .withColumn("units_sold", get_json_object($"value".cast("String"),
"$.units_sold"))
      .withColumn("promotion_cost",
get_json_object($"value".cast("String"), "$.promotion_cost"))
      .withColumn("date_of_order", get_json_object($"value".cast("String"),
"$.date_of_order"))
      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
"yyyyMMddHHmmss").cast(TimestampType))
      .select($"s_customer_id", $"s_order_id", $"s_promotion_id",
$"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
        $"total_cost".cast("integer") as "total_cost",
$"promotion_cost".cast("integer") as "promotion_cost",
        $"date_of_order", $"tstamp_trans", $"TIMESTAMP",
$"units_sold".cast("integer") as "units_sold")


    val invoice_details = invoice_df
      .withColumn("order_id", get_json_object($"value".cast("String"),
"$.order_id"))
      .withColumn("invoice_status",
get_json_object($"value".cast("String"), "$.invoice_status"))
      .where($"invoice_status" === "Success")

      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
"yyyyMMddHHmmss").cast(TimestampType))



    val order_wm = order_details.withWatermark("tstamp_trans", args(4))
    val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5))

    val join_df = order_wm
      .join(invoice_wm, order_wm.col("s_order_id") ===
invoice_wm.col("order_id"))
      .select($"s_customer_id", $"s_promotion_id", $"s_store_id",
$"s_product_id",
        $"s_warehouse_id", $"unit_cost", $"total_cost",
        $"promotion_cost",
        $"date_of_order",
        $"units_sold" as "units_sold", $"order_id")

    val final_ids = join_df
      .withColumn("value", to_json(struct($"s_customer_id",
$"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost",
$"total_cost".cast("Int") as "total_cost",
        $"promotion_cost".cast("Int") as "promotion_cost",
        $"date_of_order",
        $"units_sold".cast("Int") as "units_sold", $"order_id")))
      .dropDuplicates("order_id")
      .select("value")


    val write_df = final_ids
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("topic", dest_topic_name)
      .option("checkpointLocation", checkpoint_path)
      .trigger(Trigger.ProcessingTime("1 second"))
      .start()

    write_df.awaitTermination()

  }

}
****************************

Let me know, it is taking more than a minute for every run. The waiting
time is keep on increasing as the data grows.

Please let me know, any thing we need to configure to make it fast. I tried
increase the parallesim.

Executor: tried from <1 to 4> and memory i gave is 3GB. The data flow is
very less. Even for the single data it is taking time.

Reply via email to