Best, Danny Chan ---------- 转发信息 ---------- 发件人: Danny Chan <yuzhao....@gmail.com> 日期: 2020年7月20日 +0800 PM4:51 收件人: Dongwon Kim <eastcirc...@gmail.com> 主题: Re: [Table API] how to configure a nested timestamp field
> Or is it possible you pre-define a catalog there and register through the SQL > CLI yaml ? > > Best, > Danny Chan > 在 2020年7月20日 +0800 PM3:23,Dongwon Kim <eastcirc...@gmail.com>,写道: > > Hi Leonard, > > > > > Unfortunately the answer is no, the YAML you defined will parse by Table > > > API and then execute, the root cause of your post error is Table API does > > > not support computed column now, > > > there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, > > > I think DDL is recommended way since FLINK 1.11.0. > > Okay, thanks a lot for your input. > > > > I just tried out Flink SQL client and wanted to store pre-defined YAML > > files each declaring a source table from a Kafka topic. > > As you advised, I have to manually enter DDL in the SQL client on FLINK > > 1.11.x > > > > Best, > > > > Dongwon > > > > > > > On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu <xbjt...@gmail.com> wrote: > > > > Hi, Kim > > > > > > > > > Hi Leonard, > > > > > > > > > > Can I have a YAML definition corresponding to the DDL you suggested? > > > > > > > > Unfortunately the answer is no, the YAML you defined will parse by > > > > Table API and then execute, the root cause of your post error is Table > > > > API does not support computed column now, > > > > > > > > there is a FLIP under discussion[1], this should be ready in 1.12.0. > > > > BTW, I think DDL is recommended way since FLINK 1.11.0. > > > > > > > > Best, > > > > Leonard Xu > > > > [1] > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API > > > > > > > > > 在 2020年7月20日,14:30,Dongwon Kim <eastcirc...@gmail.com> 写道: > > > > > > > > > > > > > > > I tried below (Flink 1.11.0) but got some error: > > > > > > tables: > > > > > > - name: test > > > > > > type: source-table > > > > > > update-mode: append > > > > > > connector: > > > > > > property-version: 1 > > > > > > type: kafka > > > > > > version: universal > > > > > > topic: ... > > > > > > properties: > > > > > > bootstrap.servers: ... > > > > > > group.id: ... > > > > > > format: > > > > > > property-version: 1 > > > > > > type: json > > > > > > schema: > > > > > > - name: type > > > > > > data-type: STRING > > > > > > - name: location > > > > > > data-type: > > > > > > > ROW< > > > > > > id STRING, > > > > > > lastUpdateTime BIGINT > > > > > > > > > > > > > - name: timestampCol > > > > > > data-type: TIMESTAMP(3) > > > > > > rowtime: > > > > > > timestamps: > > > > > > type: from-field > > > > > > from: > > > > > > TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, > > > > > > 'yyyy-MM-dd HH:mm:ss')) > > > > > > watermarks: > > > > > > type: periodic-bounded > > > > > > delay: 5000 > > > > > > > > > > SQL client doesn't complain about the file but, when I execute > > > > > "SELECT timestampCol from test", the job fails with the following > > > > > error message: > > > > > > Caused by: java.lang.NullPointerException > > > > > > at > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236) > > > > > > at > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > > > > > > at > > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > > > > > > at > > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > > > > > > at SourceConversion$4.processElement(Unknown Source) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) > > > > > > at > > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) > > > > > > at > > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) > > > > > > at > > > > > > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) > > > > > > at > > > > > > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) > > > > > > at > > > > > > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) > > > > > > at > > > > > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) > > > > > > at > > > > > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > > > > > > at > > > > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > > > > > > at > > > > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > > > > > > at > > > > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > > > > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > > > > > > > > > > > On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <eastcirc...@gmail.com> > > > > > > wrote: > > > > > > > Hi Leonard, > > > > > > > > > > > > > > Wow, that's great! It works like a charm. > > > > > > > I've never considered this approach at all. > > > > > > > Thanks a lot. > > > > > > > > > > > > > > Best, > > > > > > > Dongwon > > > > > > > > > > > > > > > On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xbjt...@gmail.com> > > > > > > > > wrote: > > > > > > > > > Hi, Kim > > > > > > > > > > > > > > > > > > The reason your attempts (2) and (3) failed is that the json > > > > > > > > > format does not support convert a BIGINT to TIMESTAMP, you > > > > > > > > > can first define the BIGINT field and then use a computed > > > > > > > > > column to extract TIMESTAMP field, you can also define the > > > > > > > > > time attribute on TIMESTAMP filed for using time-based > > > > > > > > > operations in Flink 1.10.1. But the computed column only > > > > > > > > > support in pure DDL, the Table API lacks the support and > > > > > > > > > should be aligned in 1.12 as I know. > > > > > > > > > The DDL syntax as following: > > > > > > > > > > > > > > > > > > create table test ( > > > > > > > > > `type` STRING, > > > > > > > > > `location` ROW<`id` STRING, lastUpdateTime BIGINT>, > > > > > > > > > timestampCol as > > > > > > > > > TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, > > > > > > > > > 'yyyy-MM-dd HH:mm:ss')), —computed column > > > > > > > > > WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' > > > > > > > > > SECOND > > > > > > > > > ) with ( > > > > > > > > > 'connector' = '...', > > > > > > > > > 'format' = 'json', > > > > > > > > > ... > > > > > > > > > ); > > > > > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Leonard Xu > > > > > > > > > [1] > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2020年7月4日,21:21,Dongwon Kim <eastcirc...@gmail.com> 写道: > > > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > I use Flink 1.10.1 and I want to use Table API to read JSON > > > > > > > > > > messages. The message looks like below. > > > > > > > > > > > { > > > > > > > > > > > "type":"Update", > > > > > > > > > > > "location":{ > > > > > > > > > > > "id":"123e4567-e89b-12d3-a456-426652340000", > > > > > > > > > > > "lastUpdateTime":1593866161436 > > > > > > > > > > > } > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > I wrote the following program just to see whether json > > > > > > > > > > messages are correctly parsed by Table API: > > > > > > > > > > > StreamExecutionEnvironment env = > > > > > > > > > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > > > > > > > > > > > > > > > > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > > > > > > > > EnvironmentSettings envSettings = > > > > > > > > > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > > > > > > > > > > StreamTableEnvironment tEnv = > > > > > > > > > > > StreamTableEnvironment.create(env, envSettings); > > > > > > > > > > > tEnv > > > > > > > > > > > .connect( > > > > > > > > > > > new Kafka() > > > > > > > > > > > .version("universal") > > > > > > > > > > > .topic(consumerTopic) > > > > > > > > > > > .startFromLatest() > > > > > > > > > > > .properties(consumerProperties) > > > > > > > > > > > ) > > > > > > > > > > > .withFormat(new Json()) > > > > > > > > > > > .withSchema(new Schema().schema( > > > > > > > > > > > TableSchema.builder() > > > > > > > > > > > .field("type", STRING()) > > > > > > > > > > > .field("location", > > > > > > > > > > > ROW( > > > > > > > > > > > FIELD("id", STRING()), > > > > > > > > > > > // (1) > > > > > > > > > > > FIELD("lastUpdateTime", BIGINT()) > > > > > > > > > > > // (2) > > > > > > > > > > > FIELD("lastUpdateTime", TIMESTAMP()) > > > > > > > > > > > // (3) > > > > > > > > > > > FIELD("lastUpdateTime", > > > > > > > > > > > TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)) > > > > > > > > > > > )) > > > > > > > > > > > .build() > > > > > > > > > > > )) > > > > > > > > > > > .createTemporaryTable("message"); > > > > > > > > > > > tEnv.toAppendStream(tEnv.from("message"), Row.class) > > > > > > > > > > > .print(); > > > > > > > > > > > > > > > > > > > > Note that I tried BIGINT(), TIMESTAMP(), and > > > > > > > > > > TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class). > > > > > > > > > > (1) it works fine but later I can't use time-based > > > > > > > > > > operations like windowing. > > > > > > > > > > > > > > > > > > > > (2) it causes the following exception > > > > > > > > > > > Exception in thread "main" > > > > > > > > > > > org.apache.flink.table.api.ValidationException: Type > > > > > > > > > > > ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table > > > > > > > > > > > field 'location' does not match with the physical type > > > > > > > > > > > ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the > > > > > > > > > > > 'location' field of the TableSource return type. > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252) > > > > > > > > > > > at > > > > > > > > > > > java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) > > > > > > > > > > > at > > > > > > > > > > > java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) > > > > > > > > > > > at > > > > > > > > > > > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > > > > > > > > > > > at > > > > > > > > > > > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > > > > > > > > > > > at > > > > > > > > > > > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > > > > > > > > > > > at > > > > > > > > > > > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > > > > > > > > > > > at > > > > > > > > > > > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > > > > > > > > > > > at > > > > > > > > > > > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) > > > > > > > > > > > at > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > > > > > > > > > at > > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > > > > > > > > > > at > > > > > > > > > > > scala.collection.Iterator$class.foreach(Iterator.scala:891) > > > > > > > > > > > at > > > > > > > > > > > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > > > > > > > > > > at > > > > > > > > > > > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > > > > > > > > > > at > > > > > > > > > > > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > > > > > > > > > > > at > > > > > > > > > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > > > > > > > > > > at > > > > > > > > > > > scala.collection.AbstractTraversable.map(Traversable.scala:104) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) > > > > > > > > > > > at com.kakaomobility.mobdata.Finder.main(Finder.java:133) > > > > > > > > > > > Caused by: > > > > > > > > > > > org.apache.flink.table.api.ValidationException: Type > > > > > > > > > > > ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table > > > > > > > > > > > field 'location' does not match with the physical type > > > > > > > > > > > ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the > > > > > > > > > > > 'location' field of the TableSource return type. > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188) > > > > > > > > > > > ... 38 more > > > > > > > > > > > > > > > > > > > > (3) it causes the following exception > > > > > > > > > > > Caused by: java.time.format.DateTimeParseException: Text > > > > > > > > > > > '1593868714814' could not be parsed at index 0 > > > > > > > > > > > at > > > > > > > > > > > java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) > > > > > > > > > > > at > > > > > > > > > > > java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > > > > > > > > > > > at > > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) > > > > > > > > > > > ... 7 more > > > > > > > > > > > > > > > > > > > > Can I read such json messages with time information in > > > > > > > > > > Flink 1.10.1? > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > Dongwon > > > > > > > > > > > > >