Thanks for reporting. Could you please help create a jira about it? Best regards, Yuxia
----- 原始邮件 ----- 发件人: "Xiaolong Wang" <xiaolong.w...@smartnews.com.INVALID> 收件人: "dev" <dev@flink.apache.org> 发送时间: 星期四, 2024年 3 月 28日 下午 5:11:20 主题: Re: Bug report for reading Hive table as streaming source. I think it worth mentioning in the documentation of Hive read that it cannot read a table that has more than 32,767 partitions. On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang <xiaolong.w...@smartnews.com> wrote: > Found out the reason: > > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch > partitions using the following method: > > List<String> listPartitionNames(String db_name, String tbl_name, > short max_parts) throws MetaException, TException; > > where the max_parts represents the max number of partitions it can fetch > from the Hive metastore. > So the max number of partitions it can fetch is Short.MAX_VALUE, which is > 32767 . > > But the table has a way more partition number than the max value, thus the > list partition operations cannot fetch all partitions, hence it cannot > consume the recent partition. > > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang <xiaolong.w...@smartnews.com> > wrote: > >> Hi, >> >> I found a weird bug when reading a Hive table as a streaming source. >> >> In summary, if the first partition is not time related, then the Hive >> table cannot be read as a streaming source. >> >> e.g. >> >> I've a Hive table in the definition of >> >> ``` >> CREATE TABLE article ( >> id BIGINT, >> edition STRING, >> dt STRING, >> hh STRING >> ) >> PARTITIONED BY (edition, dt, hh) >> USING orc; >> ``` >> Then I try to query it as a streaming source: >> >> ``` >> INSERT INTO kafka_sink >> SELECT id >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true', >> 'streaming-source.partition-order' = 'partition-name', >> 'streaming-source.consume-start-offset' = >> 'edition=en_US/dt=2024-03-26/hh=00') */ >> ``` >> >> And I see no output in the `kafka_sink`. >> >> Then I defined an external table pointing to the same path but has no >> `edition` partition, >> >> ``` >> CREATE TABLE en_article ( >> id BIGINT, >> edition STRING, >> dt STRING, >> hh STRING >> ) >> PARTITIONED BY (edition, dt, hh) >> LOCATION 's3://xxx/article/edition=en_US' >> USING orc; >> ``` >> >> And insert with the following statement: >> >> ``` >> INSERT INTO kafka_sink >> SELECT id >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', >> 'streaming-source.partition-order' = 'partition-name', >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ >> ``` >> >> The data is sinked. >> >> >