shizhengchao created FLINK-25124: ------------------------------------ Summary: A deadlock occurs when the jdbc sink uses two consecutive dimension tables to associate Key: FLINK-25124 URL: https://issues.apache.org/jira/browse/FLINK-25124 Project: Flink Issue Type: Bug Components: Connectors / JDBC Affects Versions: 1.13.1 Reporter: shizhengchao
The sql statement is as follows: {code:java} //代码占位符 INSERT INTO imei_phone_domestic_realtime SELECT t.data.imei AS imei, CAST(t.data.register_date_key AS bigint) AS register_date_key, c.agent_type AS channel_name, c.agent_short_name, c.agent_name, c.agent_chinese_name, c.isforeign AS agent_market_type, p.seriename AS series_name, p.salename AS sale_name, p.devname AS dev_name, p.devnamesource AS dev_name_source, p.color, p.isforeign AS product_market_type, p.carrier, p.lcname AS life_cycle, IFNULL(p.shipping_price,0) AS shipping_price, IFNULL(p.retail_price,0) AS retail_price FROM kafka_imei_phone_domestic_realtime AS t LEFT JOIN dim_product FOR SYSTEM_TIME AS OF t.proctime AS p ON p.pn=t.item_code LEFT JOIN dim_customer FOR SYSTEM_TIME AS OF t.proctime AS c ON c.customer_code=t.customer_code where t.eventType='update'; {code} There will be a probability of deadlock: {code:java} //代码占位符 "jdbc-upsert-output-format-thread-1" Id=84 BLOCKED on org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af owned by "Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, kafka_imei_phone_domestic_realtime]], fields=[data, eventType]) -> Calc(select=[data, data.item_code AS $f3], where=[(eventType = _UTF-16LE'update':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) -> LookupJoin(table=[default_catalog.default_database.dim_product], joinType=[LeftOuterJoin], async=[false], lookup=[pn=$f3], select=[data, $f3, pn, color, isforeign, devname, salename, seriename, lcname, carrier, devnamesource, shipping_price, retail_price]) -> Calc(select=[data, color, isforeign, devname, salename, seriename, lcname, carrier, devnamesource, shipping_price, retail_price, data.customer_code AS $f31]) -> LookupJoin(table=[default_catalog.default_database.dim_customer], joinType=[LeftOuterJoin], async=[false], lookup=[customer_code=$f31], select=[data, color, isforeign, devname, salename, seriename, lcname, carrier, devnamesource, shipping_price, retail_price, $f31, customer_code, agent_short_name, agent_name, isforeign, agent_type, agent_chinese_name]) -> Calc(select=[data.imei AS imei, CAST(data.register_date_key) AS register_date_key, agent_type AS channel_name, agent_short_name, agent_name, agent_chinese_name, isforeign0 AS agent_market_type, seriename AS series_name, salename AS sale_name, devname AS dev_name, devnamesource AS dev_name_source, color, isforeign AS product_market_type, carrier, lcname AS life_cycle, IFNULL(shipping_price, 0:DECIMAL(10, 0)) AS shipping_price, IFNULL(retail_price, 0:DECIMAL(10, 0)) AS retail_price]) -> NotNullEnforcer(fields=[imei]) -> Sink: Sink(table=[default_catalog.default_database.imei_phone_domestic_realtime], fields=[imei, register_date_key, channel_name, agent_short_name, agent_name, agent_chinese_name, agent_market_type, series_name, sale_name, dev_name, dev_name_source, color, product_market_type, carrier, life_cycle, shipping_price, retail_price]) (6/12)#0" Id=82 at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:124) - blocked on org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat@649788af at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat$$Lambda$344/21845506.run(Unknown Source) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... Number of locked synchronizers = 1 - java.util.concurrent.ThreadPoolExecutor$Worker@325612a2 {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)