[ https://issues.apache.org/jira/browse/FLINK-27639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17541808#comment-17541808 ]
lvycc commented on FLINK-27639: ------------------------------- Run init sql in Mysql: {code:java} //代码占位符 CREATE TABLE `t_order` ( `order_id` int NOT NULL AUTO_INCREMENT COMMENT '订单id', `order_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '订单名称', `product_id` int NULL DEFAULT NULL COMMENT '商品id', `user_id` int NULL DEFAULT NULL COMMENT '用户id', PRIMARY KEY (`order_id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 7 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of t_order -- ---------------------------- INSERT INTO `t_order` VALUES (1, '李四买吹风机4', 1, 2); INSERT INTO `t_order` VALUES (2, '王二买电饭煲4', 2, 1); INSERT INTO `t_order` VALUES (3, '小明买口罩4', 3, 4); INSERT INTO `t_order` VALUES (4, '张三买吹风机3', 1, 3); CREATE TABLE `t_logistics` ( `logistics_id` int NOT NULL AUTO_INCREMENT, `logistics_target` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, `logistics_source` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, `logistics_time` datetime(0) NULL DEFAULT NULL, `order_id` int NULL DEFAULT NULL, PRIMARY KEY (`logistics_id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of t_logistics -- ---------------------------- INSERT INTO `t_logistics` VALUES (1, '湖南省长沙市', '广东省揭阳市', '2022-04-07 11:46:52', 1); INSERT INTO `t_logistics` VALUES (2, '广东省深圳市', '浙江省杭州市', '2022-04-06 11:48:09', 2); INSERT INTO `t_logistics` VALUES (3, '北京市', '山东省青岛市', '2022-04-06 11:48:50', 3); INSERT INTO `t_logistics` VALUES (4, '上海市', '广东省揭阳市', '2022-04-07 11:49:24', 4); CREATE TABLE `t_join_sink` ( `order_id` int NOT NULL, `order_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, `logistics_id` int NULL DEFAULT NULL, `logistics_target` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, `logistics_source` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL, `logistics_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`order_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic; {code} Then start the flink job, in the end, run delete sql {code:java} //代码占位符 delete from t_order where id = 1;{code} the table `t_join_sink` not delete > Flink JOIN uses the now() function when inserting data, resulting in data > that cannot be deleted > ------------------------------------------------------------------------------------------------ > > Key: FLINK-27639 > URL: https://issues.apache.org/jira/browse/FLINK-27639 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.14.4 > Reporter: lvycc > Priority: Major > > I use the now() function as the field value when I insert data using SQL ,but > I can't delete the inserted data,here is my sql: > {code:java} > //代码占位符 > CREATE TABLE t_order ( > order_id INT, > order_name STRING, > product_id INT, > user_id INT, > PRIMARY KEY(order_id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = 'ycc123', > 'database-name' = 'wby_test', > 'table-name' = 't_order' > ); > CREATE TABLE t_logistics ( > logistics_id INT, > logistics_target STRING, > logistics_source STRING, > logistics_time TIMESTAMP(0), > order_id INT, > PRIMARY KEY(logistics_id) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc', > 'hostname' = 'localhost', > 'port' = '3306', > 'username' = 'root', > 'password' = 'ycc123', > 'database-name' = 'wby_test', > 'table-name' = 't_logistics' > ); > CREATE TABLE t_join_sink ( > order_id INT, > order_name STRING, > logistics_id INT, > logistics_target STRING, > logistics_source STRING, > logistics_time timestamp, > PRIMARY KEY(order_id) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = > 'jdbc:mysql://localhost:3306/wby_test?characterEncoding=utf8&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai', > 'table-name' = 't_join_sink', > 'username' = 'root', > 'password' = 'ycc123' > ); > INSERT INTO t_join_sink > SELECT ord.order_id, > ord.order_name, > logistics.logistics_id, > logistics.logistics_target, > logistics.logistics_source, > now() > FROM t_order AS ord > LEFT JOIN t_logistics AS logistics ON ord.order_id=logistics.order_id; {code} > The debug finds that SinkUpsertMaterializer causes the problem ,the result of > the now() function changes when I delete the data,therefore, the delete > operation is ignored > But what can I do to avoid this problem? -- This message was sent by Atlassian Jira (v8.20.7#820007)