dob_dim_account 维表如果使用 jdbc 的 connector, flink 会在初始化的时候一次性读取所有的数据,
后续数据库中更新并不会触发 flink 计算。

要解决这个问题, dob_dim_account 需要变成流表。


Zhiwen Sun



On Thu, Nov 17, 2022 at 1:56 PM Jason_H <hyb_he...@163.com> wrote:

> hi,你好
> 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
> ---- Replied Message ----
> | From | 任召金<renzhao...@100.me> |
> | Date | 11/15/2022 09:52 |
> | To | user-zh<user-zh@flink.apache.org> |
> | Subject | Re: flinksql join |
> hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
> &nbsp;
> &nbsp;
> ------------------&nbsp;Original&nbsp;------------------
> From: &nbsp;"Jason_H"<hyb_he...@163.com&gt;;
> Date: &nbsp;Tue, Nov 15, 2022 09:46 AM
> To: &nbsp;"flink中文邮件组"<user-zh@flink.apache.org&gt;;
>
> Subject: &nbsp;Re: flinksql join
>
> &nbsp;
>
> hi,你好
> 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
> ---- Replied Message ----
> | From | RS<tinyshr...@163.com&gt; |
> | Date | 11/15/2022 09:07 |
> | To | user-zh@flink.apache.org<user-zh@flink.apache.org&gt; |
> | Subject | Re:flinksql join |
> Hi,
> 我的理解是后插入的维表数据,关联不到是正常现象,
> 如果要实现AAAA=3的话,应该要手动重新跑历史数据,然后更新现有数据,
>
>
> Thanks
>
>
>
>
>
>
> 在 2022-11-11 11:10:03,"Jason_H" <hyb_he...@163.com&gt; 写道:
>
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
> 账号 金额 笔数
> 1111 100 1&nbsp; -&gt; 未匹配
> 1111 100 1&nbsp; -&gt; 未匹配
> 1111 100 1&nbsp; -&gt; 匹配上
>
> 维表
> 账号&nbsp; 企业
> 2222&nbsp; BBBB
> 1111&nbsp; AAAA&nbsp;&nbsp; -&gt; 后插入的账号信息
> 实际输出结果
> 企业&nbsp; 金额&nbsp; 笔数
> AAAA 100&nbsp;&nbsp; 1
>
>
> 我想要的结果:
> 企业&nbsp; 金额&nbsp; 笔数
> AAAA 300&nbsp;&nbsp; 3
>
>
>
>
>
> sql如下:
> String sql2 =&nbsp; "insert into dws_b2b_trade_year_index\n" +
> "WITH temp AS (\n" +
> "select \n" +
> "&nbsp; ta.gmtStatistical as gmtStatistical,\n" +
> "&nbsp; ta.paymentMethod as paymentMethod,\n" +
> "&nbsp; tb.CORP_ID as outCorpId,\n" +
> "&nbsp; tc.CORP_ID as inCorpId,\n" +
> "&nbsp; sum(ta.tradeAmt) as tranAmount,\n" +
> "&nbsp; sum(ta.tradeCnt) as tranNum \n" +
> "from dws_a2a_trade_year_index ta \n" +
> "left join dob_dim_account for system_time as of ta.proc as tb on
> ta.outAcctCode = tb.ACCT_CODE \n" +
> "left join dob_dim_account for system_time as of ta.proc as tc on
> ta.inAcctCode = tc.ACCT_CODE \n" +
> "group by \n" +
> " ta.gmtStatistical, \n" +
> " ta.paymentMethod, \n" +
> " tb.CORP_ID, \n" +
> " tc.CORP_ID \n" +
> ") \n" +
> "SELECT \n" +
> "&nbsp;&nbsp; DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
> gmtUpdate, \n" +
> "&nbsp;&nbsp; gmtStatistical, \n" +
> "&nbsp;&nbsp; paymentMethod, \n" +
> "&nbsp;&nbsp; outCorpId, \n" +
> "&nbsp;&nbsp; inCorpId, \n" +
> "&nbsp;&nbsp; tranAmount, \n" +
> "&nbsp;&nbsp; tranNum \n" +
> "FROM temp";
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |

回复