我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
"    DIM_ACCOUNT_ID  string ,\n" +
"    GMT_CREATE  string ,\n" +
"    ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
"    GMT_UPDATE  string ,\n" +
"    ACCT_CODE  string ,\n" +
"    CUST_ID  string ,\n" +
"    CUST_NAME  string ,\n" +
"    CORP_ID  string ,\n" +
"    CORP_CERT_CODE  string ,\n" +
"    CORP_CERT_TYPE  string ,\n" +
"    CUST_MANAGER_JOB_CODE  string ,\n" +
"    TEAM_CODE  string ,\n" +
"    ORG_ID  string, \n" +
"    SUPER_ORG_ID  string, \n" +
"    IS_OUTSIDE  BIGINT \n" +
") \n" +
"WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = '***',\n" +
"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"  'username' = 'root',\n" +
"  'password' = '123456',\n" +
//"  'lookup.cache.ttl' = '1s', \n" +
"  'table-name' = 'dob_dim_account' \n" +
//"  'lookup.cache.max-rows' = '1000' \n" +
//"  'lookup.cache.ttl' = '1 minute',\n" +
//"  'lookup.max-retries' = '3' \n" +
" )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来



| |
Jason_H
|
|
hyb_he...@163.com
|
---- Replied Message ----
| From | Jason_H<hyb_he...@163.com> |
| Date | 11/11/2022 14:42 |
| To | flink中文邮件组<user-zh@flink.apache.org> |
| Subject | Re: flinksql join |
我尝试使用普通的join
String sql2 = "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";
这是我创建mysql表:
String accountDimSource = "CREATE TABLE dob_dim_account (\n" +
"    DIM_ACCOUNT_ID  string ,\n" +
"    GMT_CREATE  string ,\n" +
"    ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" +
"    GMT_UPDATE  string ,\n" +
"    ACCT_CODE  string ,\n" +
"    CUST_ID  string ,\n" +
"    CUST_NAME  string ,\n" +
"    CORP_ID  string ,\n" +
"    CORP_CERT_CODE  string ,\n" +
"    CORP_CERT_TYPE  string ,\n" +
"    CUST_MANAGER_JOB_CODE  string ,\n" +
"    TEAM_CODE  string ,\n" +
"    ORG_ID  string, \n" +
"    SUPER_ORG_ID  string, \n" +
"    IS_OUTSIDE  BIGINT \n" +
") \n" +
"WITH (\n" +
"  'connector' = 'jdbc',\n" +
"  'url' = '***',\n" +
"  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"  'username' = 'root',\n" +
"  'password' = '123456',\n" +
//"  'lookup.cache.ttl' = '1s', \n" +
"  'table-name' = 'dob_dim_account' \n" +
//"  'lookup.cache.max-rows' = '1000' \n" +
//"  'lookup.cache.ttl' = '1 minute',\n" +
//"  'lookup.max-retries' = '3' \n" +
" )";


但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来



| |
Jason_H
|
|
hyb_he...@163.com
|
---- Replied Message ----
| From | Zhiwen Sun<pens...@gmail.com> |
| Date | 11/11/2022 14:08 |
| To | <user-zh@flink.apache.org> |
| Subject | Re: flinksql join |
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H <hyb_he...@163.com> wrote:



hi,大家好

我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1  -> 未匹配
1111 100 1  -> 未匹配
1111 100 1  -> 匹配上

维表
账号  企业
2222  BBBB
1111  AAAA   -> 后插入的账号信息
实际输出结果
企业  金额  笔数
AAAA 100   1


我想要的结果:
企业  金额  笔数
AAAA 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
"WITH temp AS (\n" +
"select \n" +
"  ta.gmtStatistical as gmtStatistical,\n" +
"  ta.paymentMethod as paymentMethod,\n" +
"  tb.CORP_ID as outCorpId,\n" +
"  tc.CORP_ID as inCorpId,\n" +
"  sum(ta.tradeAmt) as tranAmount,\n" +
"  sum(ta.tradeCnt) as tranNum \n" +
"from dws_a2a_trade_year_index ta \n" +
"left join dob_dim_account for system_time as of ta.proc as
tb on ta.outAcctCode = tb.ACCT_CODE \n" +
"left join dob_dim_account for system_time as of ta.proc as
tc on ta.inAcctCode = tc.ACCT_CODE \n" +
"group by \n" +
" ta.gmtStatistical, \n" +
" ta.paymentMethod, \n" +
" tb.CORP_ID, \n" +
" tc.CORP_ID \n" +
") \n" +
"SELECT \n" +
"   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as
gmtUpdate, \n" +
"   gmtStatistical, \n" +
"   paymentMethod, \n" +
"   outCorpId, \n" +
"   inCorpId, \n" +
"   tranAmount, \n" +
"   tranNum \n" +
"FROM temp";

| |
Jason_H
|
|
hyb_he...@163.com
|

回复