我尝试使用普通的join String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; 这是我创建mysql表: String accountDimSource = "CREATE TABLE dob_dim_account (\n" + " DIM_ACCOUNT_ID string ,\n" + " GMT_CREATE string ,\n" + " ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" + " GMT_UPDATE string ,\n" + " ACCT_CODE string ,\n" + " CUST_ID string ,\n" + " CUST_NAME string ,\n" + " CORP_ID string ,\n" + " CORP_CERT_CODE string ,\n" + " CORP_CERT_TYPE string ,\n" + " CUST_MANAGER_JOB_CODE string ,\n" + " TEAM_CODE string ,\n" + " ORG_ID string, \n" + " SUPER_ORG_ID string, \n" + " IS_OUTSIDE BIGINT \n" + ") \n" + "WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '***',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + //" 'lookup.cache.ttl' = '1s', \n" + " 'table-name' = 'dob_dim_account' \n" + //" 'lookup.cache.max-rows' = '1000' \n" + //" 'lookup.cache.ttl' = '1 minute',\n" + //" 'lookup.max-retries' = '3' \n" + " )";
但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来 | | Jason_H | | hyb_he...@163.com | ---- Replied Message ---- | From | Jason_H<hyb_he...@163.com> | | Date | 11/11/2022 14:42 | | To | flink中文邮件组<user-zh@flink.apache.org> | | Subject | Re: flinksql join | 我尝试使用普通的join String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; 这是我创建mysql表: String accountDimSource = "CREATE TABLE dob_dim_account (\n" + " DIM_ACCOUNT_ID string ,\n" + " GMT_CREATE string ,\n" + " ts_date AS TO_TIMESTAMP(GMT_CREATE), \n" + " GMT_UPDATE string ,\n" + " ACCT_CODE string ,\n" + " CUST_ID string ,\n" + " CUST_NAME string ,\n" + " CORP_ID string ,\n" + " CORP_CERT_CODE string ,\n" + " CORP_CERT_TYPE string ,\n" + " CUST_MANAGER_JOB_CODE string ,\n" + " TEAM_CODE string ,\n" + " ORG_ID string, \n" + " SUPER_ORG_ID string, \n" + " IS_OUTSIDE BIGINT \n" + ") \n" + "WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = '***',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = 'root',\n" + " 'password' = '123456',\n" + //" 'lookup.cache.ttl' = '1s', \n" + " 'table-name' = 'dob_dim_account' \n" + //" 'lookup.cache.max-rows' = '1000' \n" + //" 'lookup.cache.ttl' = '1 minute',\n" + //" 'lookup.max-retries' = '3' \n" + " )"; 但是此方式,好像不能解决我之前的问题,并且,再新加入账号时,新来的数据也无法关联出来 | | Jason_H | | hyb_he...@163.com | ---- Replied Message ---- | From | Zhiwen Sun<pens...@gmail.com> | | Date | 11/11/2022 14:08 | | To | <user-zh@flink.apache.org> | | Subject | Re: flinksql join | 用普通的 join, 不要用 lookup join Zhiwen Sun On Fri, Nov 11, 2022 at 11:10 AM Jason_H <hyb_he...@163.com> wrote: hi,大家好 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: kakfa输入: 账号 金额 笔数 1111 100 1 -> 未匹配 1111 100 1 -> 未匹配 1111 100 1 -> 匹配上 维表 账号 企业 2222 BBBB 1111 AAAA -> 后插入的账号信息 实际输出结果 企业 金额 笔数 AAAA 100 1 我想要的结果: 企业 金额 笔数 AAAA 300 3 sql如下: String sql2 = "insert into dws_b2b_trade_year_index\n" + "WITH temp AS (\n" + "select \n" + " ta.gmtStatistical as gmtStatistical,\n" + " ta.paymentMethod as paymentMethod,\n" + " tb.CORP_ID as outCorpId,\n" + " tc.CORP_ID as inCorpId,\n" + " sum(ta.tradeAmt) as tranAmount,\n" + " sum(ta.tradeCnt) as tranNum \n" + "from dws_a2a_trade_year_index ta \n" + "left join dob_dim_account for system_time as of ta.proc as tb on ta.outAcctCode = tb.ACCT_CODE \n" + "left join dob_dim_account for system_time as of ta.proc as tc on ta.inAcctCode = tc.ACCT_CODE \n" + "group by \n" + " ta.gmtStatistical, \n" + " ta.paymentMethod, \n" + " tb.CORP_ID, \n" + " tc.CORP_ID \n" + ") \n" + "SELECT \n" + " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as gmtUpdate, \n" + " gmtStatistical, \n" + " paymentMethod, \n" + " outCorpId, \n" + " inCorpId, \n" + " tranAmount, \n" + " tranNum \n" + "FROM temp"; | | Jason_H | | hyb_he...@163.com |