[ 
https://issues.apache.org/jira/browse/FLINK-27639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17541808#comment-17541808
 ] 

lvycc commented on FLINK-27639:
-------------------------------

Run  init  sql in Mysql:
{code:java}
//代码占位符
CREATE TABLE `t_order`  (
  `order_id` int NOT NULL AUTO_INCREMENT COMMENT '订单id',
  `order_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci 
NULL DEFAULT NULL COMMENT '订单名称',
  `product_id` int NULL DEFAULT NULL COMMENT '商品id',
  `user_id` int NULL DEFAULT NULL COMMENT '用户id',
  PRIMARY KEY (`order_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 7 CHARACTER SET = utf8mb4 COLLATE = 
utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of t_order
-- ----------------------------
INSERT INTO `t_order` VALUES (1, '李四买吹风机4', 1, 2);
INSERT INTO `t_order` VALUES (2, '王二买电饭煲4', 2, 1);
INSERT INTO `t_order` VALUES (3, '小明买口罩4', 3, 4);
INSERT INTO `t_order` VALUES (4, '张三买吹风机3', 1, 3);

CREATE TABLE `t_logistics`  (
  `logistics_id` int NOT NULL AUTO_INCREMENT,
  `logistics_target` varchar(255) CHARACTER SET utf8mb4 COLLATE 
utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `logistics_source` varchar(255) CHARACTER SET utf8mb4 COLLATE 
utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `logistics_time` datetime(0) NULL DEFAULT NULL,
  `order_id` int NULL DEFAULT NULL,
  PRIMARY KEY (`logistics_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8mb4 COLLATE = 
utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of t_logistics
-- ----------------------------
INSERT INTO `t_logistics` VALUES (1, '湖南省长沙市', '广东省揭阳市', '2022-04-07 11:46:52', 
1);
INSERT INTO `t_logistics` VALUES (2, '广东省深圳市', '浙江省杭州市', '2022-04-06 11:48:09', 
2);
INSERT INTO `t_logistics` VALUES (3, '北京市', '山东省青岛市', '2022-04-06 11:48:50', 3);
INSERT INTO `t_logistics` VALUES (4, '上海市', '广东省揭阳市', '2022-04-07 11:49:24', 4);

CREATE TABLE `t_join_sink`  (
  `order_id` int NOT NULL,
  `order_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci 
NULL DEFAULT NULL,
  `logistics_id` int NULL DEFAULT NULL,
  `logistics_target` varchar(255) CHARACTER SET utf8mb4 COLLATE 
utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `logistics_source` varchar(255) CHARACTER SET utf8mb4 COLLATE 
utf8mb4_0900_ai_ci NULL DEFAULT NULL,
  `logistics_time` datetime(0) NULL DEFAULT NULL,
  PRIMARY KEY (`order_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci 
ROW_FORMAT = Dynamic; {code}
Then start the flink  job, in the end, run delete sql
{code:java}
//代码占位符
delete from t_order where id = 1;{code}
the table `t_join_sink` not delete

> Flink JOIN uses the now() function when inserting data, resulting in data 
> that cannot be deleted
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-27639
>                 URL: https://issues.apache.org/jira/browse/FLINK-27639
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.14.4
>            Reporter: lvycc
>            Priority: Major
>
> I use the now() function as the field value when I insert data using SQL ,but 
> I can't delete the inserted data,here is my sql:
> {code:java}
> //代码占位符
> CREATE TABLE t_order (
>     order_id INT,
>     order_name STRING,
>     product_id INT,
>     user_id INT,
>     PRIMARY KEY(order_id) NOT ENFORCED
> ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = 'localhost',
>     'port' = '3306',
>     'username' = 'root',
>     'password' = 'ycc123',
>     'database-name' = 'wby_test',
>     'table-name' = 't_order'
> );
> CREATE TABLE t_logistics (
>     logistics_id INT,
>     logistics_target STRING,
>     logistics_source STRING,
>     logistics_time TIMESTAMP(0),
>     order_id INT,
>     PRIMARY KEY(logistics_id) NOT ENFORCED
> ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = 'localhost',
>     'port' = '3306',
>     'username' = 'root',
>     'password' = 'ycc123',
>     'database-name' = 'wby_test',
>     'table-name' = 't_logistics'
> );
> CREATE TABLE t_join_sink (
>     order_id INT,
>     order_name STRING,
>     logistics_id INT,
>     logistics_target STRING,
>     logistics_source STRING,
>     logistics_time timestamp,
>     PRIMARY KEY(order_id) NOT ENFORCED
> ) WITH (
>     'connector' = 'jdbc',
>     'url' = 
> 'jdbc:mysql://localhost:3306/wby_test?characterEncoding=utf8&useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai',
>     'table-name' = 't_join_sink',
>     'username' = 'root',
>     'password' = 'ycc123'
> );
> INSERT INTO t_join_sink
> SELECT ord.order_id,
> ord.order_name,
> logistics.logistics_id,
> logistics.logistics_target,
> logistics.logistics_source,
> now()
> FROM t_order AS ord
> LEFT JOIN t_logistics AS logistics ON ord.order_id=logistics.order_id; {code}
> The debug finds that SinkUpsertMaterializer causes the problem ,the result of 
> the now() function changes when I delete the data,therefore, the delete 
> operation is ignored
> But what can I do to avoid this problem?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to