Hi,
基于1.10 源码按照jira里面的PR修改不行么?
跟hbase的ddl关系应该不大,就发一个kafka的吧。
//代码占位符
Flink SQL> CREATE TABLE kafka_test1 (
//代码占位符
Flink SQL> CREATE TABLE kafka_test1 (
> id varchar,
> a varchar,
> b int,
> ts as PROCTIME()
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.version' = '0.11',
> 'connector.topic' = 'test',
> 'connector.properties.zookeeper.connect' = 'localnode2:2181',
> 'connector.properties.bootstrap.servers' = 'localnode2:9092',
> 'connector.properties.group.id' = 'testGroup',
> 'connector.startup-mode' = 'latest-offset',
> 'format.type' = 'json'
> )
> ;
[INFO] Table has been created.
Flink SQL> select a.*,b.* from kafka_test1 a join hbase_test1 FOR SYSTEM_TIME
AS OF a.ts as b on a.id = b.rowkey;
异常信息:
//代码占位符
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve
datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" a, INTEGER b, TIMESTAMP(3) NOT NULL ts,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey,
RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a,
INTEGER b) f) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" a, INTEGER b, TIME ATTRIBUTE(PROCTIME) NOT NULL ts,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey,
RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a,
INTEGER b) f) NOT NULL
rel:
LogicalProject(id=[$0], a=[$1], b=[$2], ts=[$3], rowkey=[$4], f=[$5])
LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0,
3}])
LogicalProject(id=[$0], a=[$1], b=[$2], ts=[PROCTIME()])
LogicalTableScan(table=[[tgou, collie, kafka_test1, source:
[Kafka011TableSource(id, a, b)]]])
LogicalFilter(condition=[=($cor1.id, $0)])
LogicalSnapshot(period=[$cor1.ts])
LogicalTableScan(table=[[tgou, collie, hbase_test1, source:
[HBaseTableSource[schema=[rowkey, f], projectFields=null]]]])
Best,
Xinghalo