hehuiyuan created FLINK-14550:
---------------------------------
Summary: can't use proctime attribute when register datastream for
table and exist nested fields
Key: FLINK-14550
URL: https://issues.apache.org/jira/browse/FLINK-14550
Project: Flink
Issue Type: Improvement
Components: Table SQL / API
Reporter: hehuiyuan
*_The data schame :_*
final String schemaString =
"{\"type\":\"record\",\"name\":\"GenericUser\",\"namespace\":\"org.apache.flink.formats.avro.generated\","
+
"\"fields\":
[\{\"name\":\"name\",\"type\":\"string\"},\{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},"
+
"\{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},\{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]}"
+
",\{\"name\":\"type_double_test\",\"type\":\"double\"},\{\"name\":\"type_null_test\",\"type\":[\"null\",\"string\"]},"
+
"\{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":"
+
"\{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\","
+
"\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\","
+
"\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\","
+
"\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\","
+
"\"values\":\"long\"}},{\"name\":\"type_fixed\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"Fixed16\","
+
"\"size\":16}],\"size\":16},\{\"name\":\"type_union\",\"type\":[\"null\",\"boolean\",\"long\",\"double\"]},"
+
*"{\"name\":\"type_nested\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Address\",\"fields\":[{\"name\":\"num\","
+*
*"\"type\":\"int\"},\{\"name\":\"street\",\"type\":\"string\"},\{\"name\":\"city\",\"type\":\"string\"},"
+*
*"\{\"name\":\"state\",\"type\":\"string\"},\{\"name\":\"zip\",\"type\":\"string\"}]}]},*{\"name\":\"type_bytes\","
+
"\"type\":\"bytes\"},\{\"name\":\"type_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
+
"\{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\","
+
"\"type\":\{\"type\":\"int\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\","
+
"\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\","
+
"\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\","
+
"\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\","
+
"\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}";
*_The code :_*
tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street,userActionTime.proctime");
_*The error is as follows:*_
Exception in thread "main" org.apache.flink.table.api.TableException: The
proctime attribute can only be appended to the table schema and not replace an
existing field. Please move 'userActionTime' to the end of the schema.Exception
in thread "main" org.apache.flink.table.api.TableException: The proctime
attribute can only be appended to the table schema and not replace an existing
field. Please move 'userActionTime' to the end of the schema. at
org.apache.flink.table.api.StreamTableEnvironment.org$apache$flink$table$api$StreamTableEnvironment$$extractProctime$1(StreamTableEnvironment.scala:649)
at
org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:676)
at
org.apache.flink.table.api.StreamTableEnvironment$$anonfun$validateAndExtractTimeAttributes$1.apply(StreamTableEnvironment.scala:668)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at
org.apache.flink.table.api.StreamTableEnvironment.validateAndExtractTimeAttributes(StreamTableEnvironment.scala:668)
at
org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:549)
at
org.apache.flink.table.api.java.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:136)
at
com.jd.flink.sql.demo.validate.schema.avro.AvroQuickStartMain.main(AvroQuickStartMain.java:145)
The code is ok.
tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested$Address$street");
The code is ok.
tableEnv.registerDataStream("source_table",deserilizeDataStreamRows,"type_nested,userActionTime.proctime");
--
This message was sent by Atlassian Jira
(v8.3.4#803005)