[ https://issues.apache.org/jira/browse/FLINK-17189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17087390#comment-17087390 ]
Rui Li commented on FLINK-17189: -------------------------------- The root cause is that {{TimestampKind}} is not encoded into the serializable string of {{TimestampType}}, {{LocalZonedTimestampType}} and {{ZonedTimestampType}}. In {{HiveCatalog}}, the table schema of a generic table is serialized using a {{DescriptorProperties}} which in turn relies on the serializable strings of logical types. Now since {{TimestampKind}} is missing, {{HiveCatalog}} is unable to properly deserialize the table schema and thus the error. I tried including {{TimestampKind}} in {{DescriptorProperties}}, e.g. by calling {{LogicalType::asSummaryString}} instead of {{LogicalType::asSerializableString}} and that fixes the issue. But I'm not sure whether that's the way to go, because it seems {{TimestampKind}} is excluded from the serializable string by design. [~twalthr] what do you think is the right way to fix the issue? > Table with processing time attribute can not be read from Hive catalog > ---------------------------------------------------------------------- > > Key: FLINK-17189 > URL: https://issues.apache.org/jira/browse/FLINK-17189 > Project: Flink > Issue Type: Bug > Components: Table SQL / Ecosystem, Table SQL / Planner > Reporter: Timo Walther > Priority: Major > Fix For: 1.11.0, 1.10.2 > > > DDL: > {code} > CREATE TABLE PROD_LINEITEM ( > L_ORDERKEY INTEGER, > L_PARTKEY INTEGER, > L_SUPPKEY INTEGER, > L_LINENUMBER INTEGER, > L_QUANTITY DOUBLE, > L_EXTENDEDPRICE DOUBLE, > L_DISCOUNT DOUBLE, > L_TAX DOUBLE, > L_CURRENCY STRING, > L_RETURNFLAG STRING, > L_LINESTATUS STRING, > L_ORDERTIME TIMESTAMP(3), > L_SHIPINSTRUCT STRING, > L_SHIPMODE STRING, > L_COMMENT STRING, > WATERMARK FOR L_ORDERTIME AS L_ORDERTIME - INTERVAL '5' MINUTE, > L_PROCTIME AS PROCTIME() > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'Lineitem', > 'connector.properties.zookeeper.connect' = 'not-needed', > 'connector.properties.bootstrap.servers' = 'kafka:9092', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'csv', > 'format.field-delimiter' = '|' > ); > {code} > Query: > {code} > SELECT * FROM prod_lineitem; > {code} > Result: > {code} > [ERROR] Could not execute SQL statement. Reason: > java.lang.AssertionError: Conversion to relational algebra failed to preserve > datatypes: > validated type: > RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER > L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, > DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME > ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIMESTAMP(3) NOT NULL > L_PROCTIME) NOT NULL > converted type: > RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER > L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, > DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME > ATTRIBUTE(ROWTIME) L_ORDERTIME, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > L_SHIPINSTRUCT, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_COMMENT, TIME > ATTRIBUTE(PROCTIME) NOT NULL L_PROCTIME) NOT NULL > rel: > LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], > L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], > L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], > L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], > L_PROCTIME=[$15]) > LogicalWatermarkAssigner(rowtime=[L_ORDERTIME], watermark=[-($11, > 300000:INTERVAL MINUTE)]) > LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], > L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], > L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], > L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], > L_PROCTIME=[PROCTIME()]) > LogicalTableScan(table=[[hcat, default, prod_lineitem, source: > [KafkaTableSource(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, > L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_CURRENCY, L_RETURNFLAG, L_LINESTATUS, > L_ORDERTIME, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT)]]]) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)