Timo Walther created FLINK-17189:
------------------------------------

             Summary: Table with processing time attribute can not be read from 
Hive catalog
                 Key: FLINK-17189
                 URL: https://issues.apache.org/jira/browse/FLINK-17189
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Ecosystem, Table SQL / Planner
            Reporter: Timo Walther


DDL:
{code}
CREATE TABLE PROD_LINEITEM (
  L_ORDERKEY       INTEGER,
  L_PARTKEY        INTEGER,
  L_SUPPKEY        INTEGER,
  L_LINENUMBER     INTEGER,
  L_QUANTITY       DOUBLE,
  L_EXTENDEDPRICE  DOUBLE,
  L_DISCOUNT       DOUBLE,
  L_TAX            DOUBLE,
  L_CURRENCY       STRING,
  L_RETURNFLAG     STRING,
  L_LINESTATUS     STRING,
  L_ORDERTIME      TIMESTAMP(3),
  L_SHIPINSTRUCT   STRING,
  L_SHIPMODE       STRING,
  L_COMMENT        STRING,
  WATERMARK FOR L_ORDERTIME AS L_ORDERTIME - INTERVAL '5' MINUTE,
  L_PROCTIME       AS PROCTIME()
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'Lineitem',
  'connector.properties.zookeeper.connect' = 'not-needed',
  'connector.properties.bootstrap.servers' = 'kafka:9092',
  'connector.startup-mode' = 'earliest-offset',
  'format.type' = 'csv',
  'format.field-delimiter' = '|'
);
{code}

Query:
{code}
SELECT * FROM prod_lineitem;
{code}

Result:
{code}
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER 
L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, 
DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME ATTRIBUTE(ROWTIME) L_ORDERTIME, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPINSTRUCT, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" L_COMMENT, TIMESTAMP(3) NOT NULL L_PROCTIME) NOT NULL
converted type:
RecordType(INTEGER L_ORDERKEY, INTEGER L_PARTKEY, INTEGER L_SUPPKEY, INTEGER 
L_LINENUMBER, DOUBLE L_QUANTITY, DOUBLE L_EXTENDEDPRICE, DOUBLE L_DISCOUNT, 
DOUBLE L_TAX, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_CURRENCY, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_RETURNFLAG, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" L_LINESTATUS, TIME ATTRIBUTE(ROWTIME) L_ORDERTIME, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPINSTRUCT, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" L_SHIPMODE, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" L_COMMENT, TIME ATTRIBUTE(PROCTIME) NOT NULL 
L_PROCTIME) NOT NULL
rel:
LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], 
L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], 
L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], 
L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], 
L_PROCTIME=[$15])
  LogicalWatermarkAssigner(rowtime=[L_ORDERTIME], watermark=[-($11, 
300000:INTERVAL MINUTE)])
    LogicalProject(L_ORDERKEY=[$0], L_PARTKEY=[$1], L_SUPPKEY=[$2], 
L_LINENUMBER=[$3], L_QUANTITY=[$4], L_EXTENDEDPRICE=[$5], L_DISCOUNT=[$6], 
L_TAX=[$7], L_CURRENCY=[$8], L_RETURNFLAG=[$9], L_LINESTATUS=[$10], 
L_ORDERTIME=[$11], L_SHIPINSTRUCT=[$12], L_SHIPMODE=[$13], L_COMMENT=[$14], 
L_PROCTIME=[PROCTIME()])
      LogicalTableScan(table=[[hcat, default, prod_lineitem, source: 
[KafkaTableSource(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, 
L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_CURRENCY, L_RETURNFLAG, L_LINESTATUS, 
L_ORDERTIME, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT)]]])
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to