dob_dim_account 维表如果使用 jdbc 的 connector, flink 会在初始化的时候一次性读取所有的数据, 后续数据库中更新并不会触发 flink 计算。
要解决这个问题, dob_dim_account 需要变成流表。 Zhiwen Sun On Thu, Nov 17, 2022 at 1:56 PM Jason_H <hyb_he...@163.com> wrote: > hi,你好 > 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题 > > > | | > Jason_H > | > | > hyb_he...@163.com > | > ---- Replied Message ---- > | From | 任召金<renzhao...@100.me> | > | Date | 11/15/2022 09:52 | > | To | user-zh<user-zh@flink.apache.org> | > | Subject | Re: flinksql join | > hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL > > > ------------------ Original ------------------ > From: "Jason_H"<hyb_he...@163.com>; > Date: Tue, Nov 15, 2022 09:46 AM > To: "flink中文邮件组"<user-zh@flink.apache.org>; > > Subject: Re: flinksql join > > > > hi,你好 > 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。 > > > | | > Jason_H > | > | > hyb_he...@163.com > | > ---- Replied Message ---- > | From | RS<tinyshr...@163.com> | > | Date | 11/15/2022 09:07 | > | To | user-zh@flink.apache.org<user-zh@flink.apache.org> | > | Subject | Re:flinksql join | > Hi, > 我的理解是后插入的维表数据,关联不到是正常现象, > 如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据, > > > Thanks > > > > > > > 在 2022-11-11 11:10:03,"Jason_H" <hyb_he...@163.com> 写道: > > > hi,大家好 > > 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: > kakfa输入: > 账号 金额 笔数 > 1111 100 1 -> 未匹配 > 1111 100 1 -> 未匹配 > 1111 100 1 -> 匹配上 > > 维表 > 账号 企业 > 2222 BBBB > 1111 AAAA -> 后插入的账号信息 > 实际输出结果 > 企业 金额 笔数 > AAAA 100 1 > > > 我想要的结果: > 企业 金额 笔数 > AAAA 300 3 > > > > > > sql如下: > String sql2 = "insert into dws_b2b_trade_year_index\n" + > "WITH temp AS (\n" + > "select \n" + > " ta.gmtStatistical as gmtStatistical,\n" + > " ta.paymentMethod as paymentMethod,\n" + > " tb.CORP_ID as outCorpId,\n" + > " tc.CORP_ID as inCorpId,\n" + > " sum(ta.tradeAmt) as tranAmount,\n" + > " sum(ta.tradeCnt) as tranNum \n" + > "from dws_a2a_trade_year_index ta \n" + > "left join dob_dim_account for system_time as of ta.proc as tb on > ta.outAcctCode = tb.ACCT_CODE \n" + > "left join dob_dim_account for system_time as of ta.proc as tc on > ta.inAcctCode = tc.ACCT_CODE \n" + > "group by \n" + > " ta.gmtStatistical, \n" + > " ta.paymentMethod, \n" + > " tb.CORP_ID, \n" + > " tc.CORP_ID \n" + > ") \n" + > "SELECT \n" + > " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as > gmtUpdate, \n" + > " gmtStatistical, \n" + > " paymentMethod, \n" + > " outCorpId, \n" + > " inCorpId, \n" + > " tranAmount, \n" + > " tranNum \n" + > "FROM temp"; > > | | > Jason_H > | > | > hyb_he...@163.com > |