Hi All, I have a hive table where data from 2 different sources (S1 and S2) get accumulated. Sample data below -
*RECORD_ID|SOURCE_TYPE|TRN_NO|DATE1|DATE2|BRANCH|REF1|REF2|REF3|REF4|REF5|REF6|DC_FLAG|AMOUNT|CURRENCY* *1|S1|55|19-Oct-2015|19-Oct-2015|25602|999|9999|41106|47311|379|99999|004|999|9999|Cr|2672.000000|INR* *2|S1|55|19-Oct-2015|19-Oct-2015|81201|999|9999|41106|99999|379|99999|004|999|9999|Dr|2672.000000|INR* *3|S2|55|19-OCT-2015|19-OCT-2015|81201|999|9999|41106|99999|379|99999|004|999|9999|DR|2672|INR* *4|S2|55|19-OCT-2015|19-OCT-2015|25602|999|9999|41106|47311|379|99999|004|999|9999|CR|2672|INR* I have a requirement to link similar records (same dates, branch and reference numbers) source wise and assign them unique ID linking the 2 records. For example records 1 and 4 above should be linked with same ID. I've written code below to segregate data source wise and join them based on the similarities. But not knowing how to proceed further. *var hc = new org.apache.spark.sql.hive.HiveContext(sc);* *var src = hc.sql("select RECORD_ID,SOURCE,TRN_NO,DATE1,DATE2,BRANCH,REF1,REF2,REF3,REF4,REF5,REF6,DC_FLAG,AMOUNT,CURRENCY from src_table");* *var s1 = src.filter("source_type='S1'");* *var s2 = src.filter("source_type='S2'");* *var src_join = s1.as <http://s1.as>("S1").join(s2.as <http://s2.as>("S2")).filter("(S1.TRN_NO= S2.TRN_NO) and (S1.DATE1= S2.DATE1) and (S1.DATE2= S2.DATE2) and (S1.BRANCH= S2.BRANCH) and (S1.REF1= S2.REF1) and (S1.REF2= S2.REF2) and (S1.REF3= S2.REF3) and (S1.REF4= S2.REF4) and (S1.REF5= S2.REF5) and (S1.REF6= S2.REF6) and (S1.CURRENCY= S2.CURRENCY)");* Tried using a UDF which returns a random value or hashed string using record IDs of both sides and include it to schema using withColumn, but ended up getting duplicate link IDs. Also when I use a UDF I'm not able to refer to the columns using the alias in next steps. For example if I create a new DF using below line - *var src_link = src_join.as <http://src_join.as>("SJ").withColumn("LINK_ID", linkIDUDF(src_join("S1.RECORD_ID"),src("S2.RECORD_ID")));* Then in further lines I'm not able to refer to "s1" columns from "src_link" like - *var src_link_s1 = src_link.as <http://src_link.as>("SL").select($"S1.RECORD_ID");* Please guide me. Regards, Sarath.