Hi,
我的理解是后插入的维表数据,关联不到是正常现象,
如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,


Thanks






在 2022-11-11 11:10:03,"Jason_H" <hyb_he...@163.com> 写道:
>
>
>hi,大家好
>我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
>kakfa输入:
>账号 金额 笔数
>1111 100 1  -> 未匹配
>1111 100 1  -> 未匹配
>1111 100 1  -> 匹配上
>
>维表 
>账号  企业
>2222  BBBB
>1111  AAAA   -> 后插入的账号信息
>实际输出结果
>企业  金额  笔数
>AAAA 100   1
>
>
>我想要的结果:
>企业  金额  笔数
>AAAA 300   3
>
>
>
>
>
>sql如下:
>String sql2 =  "insert into dws_b2b_trade_year_index\n" +
>               "WITH temp AS (\n" +
>               "select \n" +
>               "  ta.gmtStatistical as gmtStatistical,\n" +
>               "  ta.paymentMethod as paymentMethod,\n" +
>               "  tb.CORP_ID as outCorpId,\n" +
>               "  tc.CORP_ID as inCorpId,\n" +
>               "  sum(ta.tradeAmt) as tranAmount,\n" +
>               "  sum(ta.tradeCnt) as tranNum \n" +
>               "from dws_a2a_trade_year_index ta \n" +
>               "left join dob_dim_account for system_time as of ta.proc as tb 
> on ta.outAcctCode = tb.ACCT_CODE \n" +
>               "left join dob_dim_account for system_time as of ta.proc as tc 
> on ta.inAcctCode = tc.ACCT_CODE \n" +
>               "group by \n" +
>               " ta.gmtStatistical, \n" +
>               " ta.paymentMethod, \n" +
>               " tb.CORP_ID, \n" +
>               " tc.CORP_ID \n" +
>               ") \n" +
>               "SELECT \n" +
>               "   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as 
> gmtUpdate, \n" +
>               "   gmtStatistical, \n" +
>               "   paymentMethod, \n" +
>               "   outCorpId, \n" +
>               "   inCorpId, \n" +
>               "   tranAmount, \n" +
>               "   tranNum \n" +
>               "FROM temp";
>
>| |
>Jason_H
>|
>|
>hyb_he...@163.com
>|

回复