I want to join two tables with two columns like

//    AND sr_customer_sk      = ws_bill_customer_sk
//    AND sr_item_sk          = ws_item_sk

val srJoinWs = storeReturn.join(webSales).where(_._item_sk).equalTo(_._item_sk){
    (storeReturn: StoreReturn, webSales: WebSales, out:
Collector[(Long,Long,Long)]) =>
      if(storeReturn._customer_sk.equals(webSales._bill_customer_sk))
        
out.collect(storeReturn._item_sk,storeReturn._customer_sk,storeReturn._ticket_number)
      else
        None
}

According to the explaination from join phase, I should do like it if
I want to join like the way. Isn't it right?

But the thing is it does not work in that Type dismatch; expected
TypeInformation[Long], actual(StoreReturn, WebSales,
Collector[(Long,Long,Long)]) => Any

I tried many ways but it still does not work.

Any suggestion?

Best Regards,

Phil

Reply via email to