Any one help me, I am confused. :(

On Wed, May 15, 2019 at 7:28 PM Chetan Khatri <chetan.opensou...@gmail.com>
wrote:

> Hello Spark Developers,
>
> I have a question on Spark Join I am doing.
>
> I have a full load data from RDBMS and storing at HDFS let's say,
>
> val historyDF = spark.read.parquet(*"/home/test/transaction-line-item"*)
>
> and I am getting changed data at seperate hdfs path,let's say;
>
> val deltaDF = spark.read.parquet("/home/test/transaction-line-item-delta")
>
> Now I would like to take rows from deltaDF and ignore only those records from 
> historyDF, and write to some MySQL table.
>
> Once I am done with writing to MySQL table, I would like to update 
> */home/test/transaction-line-item *as overwrite. Now I can't just
>
> overwrite because lazy evaluation and DAG structure unless write to somewhere 
> else and then write back as overwrite.
>
> val syncDataDF = historyDF.join(deltaDF.select("TRANSACTION_BY_LINE_ID", 
> "sys_change_column"), Seq("TRANSACTION_BY_LINE_ID"),
>   "left_outer").filter(deltaDF.col("sys_change_column").isNull)
>     .drop(deltaDF.col("sys_change_column"))
>
> val mergedDataDF = syncDataDF.union(deltaDF)
>
> I believe, Without doing *union *, only with Join this can be done. Please 
> suggest best approach.
>
> As I can't write back *mergedDataDF * to the path of historyDF, because from 
> there I am only reading. What I am doing is to write at temp
>
> path and then read  from there and write back! Which is bad Idea, I need 
> suggestion here...
>
>
> mergedDataDF.write.mode(SaveMode.Overwrite).parquet("home/test/transaction-line-item-temp/")
> val tempMergedDF = spark.read.parquet("home/test/transaction-line-item-temp/")
> tempMergedDF.write.mode(SaveMode.Overwrite).parquet("*/home/test/transaction-line-item"*)
>
>
> Please suggest me best approach.
>
>
> Thanks
>
>
>
>

Reply via email to