I want to join two tables with two columns like // AND sr_customer_sk = ws_bill_customer_sk // AND sr_item_sk = ws_item_sk
val srJoinWs = storeReturn.join(webSales).where(_._item_sk).equalTo(_._item_sk){ (storeReturn: StoreReturn, webSales: WebSales, out: Collector[(Long,Long,Long)]) => if(storeReturn._customer_sk.equals(webSales._bill_customer_sk)) out.collect(storeReturn._item_sk,storeReturn._customer_sk,storeReturn._ticket_number) else None } According to the explaination from join phase, I should do like it if I want to join like the way. Isn't it right? But the thing is it does not work in that Type dismatch; expected TypeInformation[Long], actual(StoreReturn, WebSales, Collector[(Long,Long,Long)]) => Any I tried many ways but it still does not work. Any suggestion? Best Regards, Phil