Hi, 我的理解是后插入的维表数据,关联不到是正常现象, 如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,
Thanks 在 2022-11-11 11:10:03,"Jason_H" <hyb_he...@163.com> 写道: > > >hi,大家好 >我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: >kakfa输入: >账号 金额 笔数 >1111 100 1 -> 未匹配 >1111 100 1 -> 未匹配 >1111 100 1 -> 匹配上 > >维表 >账号 企业 >2222 BBBB >1111 AAAA -> 后插入的账号信息 >实际输出结果 >企业 金额 笔数 >AAAA 100 1 > > >我想要的结果: >企业 金额 笔数 >AAAA 300 3 > > > > > >sql如下: >String sql2 = "insert into dws_b2b_trade_year_index\n" + > "WITH temp AS (\n" + > "select \n" + > " ta.gmtStatistical as gmtStatistical,\n" + > " ta.paymentMethod as paymentMethod,\n" + > " tb.CORP_ID as outCorpId,\n" + > " tc.CORP_ID as inCorpId,\n" + > " sum(ta.tradeAmt) as tranAmount,\n" + > " sum(ta.tradeCnt) as tranNum \n" + > "from dws_a2a_trade_year_index ta \n" + > "left join dob_dim_account for system_time as of ta.proc as tb > on ta.outAcctCode = tb.ACCT_CODE \n" + > "left join dob_dim_account for system_time as of ta.proc as tc > on ta.inAcctCode = tc.ACCT_CODE \n" + > "group by \n" + > " ta.gmtStatistical, \n" + > " ta.paymentMethod, \n" + > " tb.CORP_ID, \n" + > " tc.CORP_ID \n" + > ") \n" + > "SELECT \n" + > " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as > gmtUpdate, \n" + > " gmtStatistical, \n" + > " paymentMethod, \n" + > " outCorpId, \n" + > " inCorpId, \n" + > " tranAmount, \n" + > " tranNum \n" + > "FROM temp"; > >| | >Jason_H >| >| >hyb_he...@163.com >|