问题:flink1.1.36用SQL方式如何设置输出到hive表为upsert模式?
flink:1.13.6
hive:1.1.1
hadoop:2.6.0-cdh5.16.2
纯SQL的方式,使用kafka作为source,中间的转换会有DISTINCT 或者 GROUP 操作,将计算结果sink到hive表,会报下面的错误
doesn't support consuming update changes which is produced by node
GroupAggregate(groupBy=[id, user_id, status, EXPR$3]
在网上找了答案,说需要将sink表设置为upsert模式,尝试过按照下列方式创建sink表,创建表能成功,但提交INSERT INTO时还是报错
source表
CREATE TABLE data_2432_5074_model(
id STRING,
user_id STRING,status STRING
) WITH (
'connector' = 'kafka',
'topic' = 'person',
'properties.bootstrap.servers' = '192.168.9.116:9092',
'properties.group.id' = 'chinaoly-group',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.fail-on-missing-field'='false',
'json.ignore-parse-errors'='true'
)
sink表
CREATE TABLE output_2432_5076_model_1649226175146(
id STRING,
user_id STRING,
status STRING,
my_dt timestamp
) TBLPROPERTIES (
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'all',
'streaming-source.partition-order' = 'create-time',
'sink.partition-commit.watermark-time-zone' = 'Asia/Shanghai',
'sink.partition-commit.policy.kind' = 'metastore,success-file',
'write.upsert.enable' = 'true',
'streaming-source.monitor-interval' = '1 min'
)
计算逻辑
INSERT INTO output_2432_5076_model_1649226175146 SELECT DISTINCT id AS id,
user_id AS user_id, status AS status ,proctime() FROM (SELECT * FROM
data_2432_5074_model) WHERE status = '1'
万能的官方,能否给我答案,先谢谢了。
[email protected]