用的版本为 Flink 1.17,当前先在 Hive 中创建了 partition_test 这张表。

在代码中也指定了:sink.partition-commit.policy.kind,但是实际执行还是报上面的错,但是如果不在 Hive
中创建这张表,使用 Flink 来创建这张表就能够执行。
这是不是 Flink 1.17 的 BUG?

CREATE CATALOG my_hive_catalog
WITH (
'type' = 'hive',
-- 指定默认的 hive 数据库
'default-database' = 'zhoujielun'
);

use catalog my_hive_catalog;

CREATE TEMPORARY TABLE source_kafka (a STRING, b STRING)
WITH (
'connector' = 'datagen',
-- 每秒一条数据
'rows-per-second' = '1'
);

CREATE TABLE IF NOT EXISTS partition_test (a STRING, b STRING, date_id
STRING)
PARTITIONED BY (date_id)
WITH (
'connector' = 'hive',
'sink.partition-commit.policy.kind' = 'metastore,success-file',
'sink.partition-commit.delay' = '1s'
);

INSERT INTO partition_test
SELECT a,
b,
DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') AS dt
FROM source_kafka;

Reply via email to