Hi Leonard, Wow, that's great! It works like a charm. I've never considered this approach at all. Thanks a lot.
Best, Dongwon On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xbjt...@gmail.com> wrote: > Hi, Kim > > The reason your attempts (2) and (3) failed is that the json format does > not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT > field and then use a computed column to extract TIMESTAMP field, you can > also define the time attribute on TIMESTAMP filed for using time-based > operations in Flink 1.10.1. But the computed column only support in pure > DDL, the Table API lacks the support and should be aligned in 1.12 as I > know. > The DDL syntax as following: > > create table test ( > `type` STRING, > `location` ROW<`id` STRING, lastUpdateTime BIGINT>, > timestampCol as > TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd > HH:mm:ss')), —computed column > WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND > ) with ( > 'connector' = '...', > 'format' = 'json', > ... > ); > > > Best, > Leonard Xu > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html > > > 在 2020年7月4日,21:21,Dongwon Kim <eastcirc...@gmail.com> 写道: > > Hi, > I use Flink 1.10.1 and I want to use Table API to read JSON messages. The > message looks like below. > >> { >> "type":"Update", >> "location":{ >> "id":"123e4567-e89b-12d3-a456-426652340000", >> "lastUpdateTime":1593866161436 >> } >> } > > > I wrote the following program just to see whether json messages are > correctly parsed by Table API: > >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> EnvironmentSettings envSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >> envSettings); >> tEnv >> .connect( >> new Kafka() >> .version("universal") >> .topic(consumerTopic) >> .startFromLatest() >> .properties(consumerProperties) >> ) >> .withFormat(new Json()) >> .withSchema(new Schema().schema( >> TableSchema.builder() >> .field("type", STRING()) >> .field("location", >> ROW( >> FIELD("id", STRING()), >> // (1) >> FIELD("lastUpdateTime", BIGINT()) >> // (2) >> FIELD("lastUpdateTime", TIMESTAMP()) >> // (3) >> FIELD("lastUpdateTime", >> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)) >> )) >> .build() >> )) >> .createTemporaryTable("message"); >> tEnv.toAppendStream(tEnv.from("message"), Row.class) >> .print(); > > > Note that I tried BIGINT(), TIMESTAMP(), and > TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class). > (1) it works fine but later I can't use time-based operations like > windowing. > > (2) it causes the following exception > >> Exception in thread "main" >> org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, >> `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match >> with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of >> the 'location' field of the TableSource return type. >> at >> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166) >> at >> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191) >> at >> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252) >> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) >> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) >> at >> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) >> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) >> at >> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) >> at >> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >> at >> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234) >> at >> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212) >> at >> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >> at >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >> at >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >> at >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >> at >> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >> at com.kakaomobility.mobdata.Finder.main(Finder.java:133) >> Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` >> STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not >> match with the physical type ROW<`id` STRING, `lastUpdateTime` >> TIMESTAMP(3)> of the 'location' field of the TableSource return type. >> at >> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166) >> at >> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188) >> ... 38 more > > > (3) it causes the following exception > >> Caused by: java.time.format.DateTimeParseException: Text '1593868714814' >> could not be parsed at index 0 >> at >> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) >> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) >> ... 7 more > > > Can I read such json messages with time information in Flink 1.10.1? > > Thanks > > Dongwon > > >