Hi Leonard, Unfortunately the answer is no, the YAML you defined will parse by Table > API and then execute, the root cause of your post error is Table API does > not support computed column now, > there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, > I think DDL is recommended way since FLINK 1.11.0.
Okay, thanks a lot for your input. I just tried out Flink SQL client and wanted to store pre-defined YAML files each declaring a source table from a Kafka topic. As you advised, I have to manually enter DDL in the SQL client on FLINK 1.11.x Best, Dongwon On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu <xbjt...@gmail.com> wrote: > Hi, Kim > > Hi Leonard, > > Can I have a YAML definition corresponding to the DDL you suggested? > > > Unfortunately the answer is no, the YAML you defined will parse by Table > API and then execute, the root cause of your post error is Table API does > not support computed column now, > > there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, > I think DDL is recommended way since FLINK 1.11.0. > > Best, > Leonard Xu > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-129:+Refactor+Descriptor+API+to+register+connectors+in+Table+API> > > 在 2020年7月20日,14:30,Dongwon Kim <eastcirc...@gmail.com> 写道: > > > I tried below (Flink 1.11.0) but got some error: > >> tables: >> - name: test >> type: source-table >> update-mode: append >> connector: >> property-version: 1 >> type: kafka >> version: universal >> topic: ... >> properties: >> bootstrap.servers: ... >> group.id: ... >> format: >> property-version: 1 >> type: json >> schema: >> - name: type >> data-type: STRING >> - name: location >> data-type: > >> ROW< >> id STRING, >> lastUpdateTime BIGINT >> > >> - name: timestampCol >> data-type: TIMESTAMP(3) >> rowtime: >> timestamps: >> type: from-field >> from: >> TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 'yyyy-MM-dd >> HH:mm:ss')) >> watermarks: >> type: periodic-bounded >> delay: 5000 >> > > SQL client doesn't complain about the file but, when I execute "SELECT > timestampCol from test", the job fails with the following error message: > >> Caused by: java.lang.NullPointerException >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228) >> at >> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >> at SourceConversion$4.processElement(Unknown Source) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >> at >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > > > On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Hi Leonard, >> >> Wow, that's great! It works like a charm. >> I've never considered this approach at all. >> Thanks a lot. >> >> Best, >> Dongwon >> >> On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xbjt...@gmail.com> wrote: >> >>> Hi, Kim >>> >>> The reason your attempts (2) and (3) failed is that the json format does >>> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT >>> field and then use a computed column to extract TIMESTAMP field, you can >>> also define the time attribute on TIMESTAMP filed for using time-based >>> operations in Flink 1.10.1. But the computed column only support in pure >>> DDL, the Table API lacks the support and should be aligned in 1.12 as I >>> know. >>> The DDL syntax as following: >>> >>> create table test ( >>> `type` STRING, >>> `location` ROW<`id` STRING, lastUpdateTime BIGINT>, >>> timestampCol as >>> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd >>> HH:mm:ss')), —computed column >>> WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND >>> ) with ( >>> 'connector' = '...', >>> 'format' = 'json', >>> ... >>> ); >>> >>> >>> Best, >>> Leonard Xu >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html >>> >>> >>> 在 2020年7月4日,21:21,Dongwon Kim <eastcirc...@gmail.com> 写道: >>> >>> Hi, >>> I use Flink 1.10.1 and I want to use Table API to read JSON messages. >>> The message looks like below. >>> >>>> { >>>> "type":"Update", >>>> "location":{ >>>> "id":"123e4567-e89b-12d3-a456-426652340000", >>>> "lastUpdateTime":1593866161436 >>>> } >>>> } >>> >>> >>> I wrote the following program just to see whether json messages are >>> correctly parsed by Table API: >>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>>> EnvironmentSettings envSettings = >>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >>>> envSettings); >>>> tEnv >>>> .connect( >>>> new Kafka() >>>> .version("universal") >>>> .topic(consumerTopic) >>>> .startFromLatest() >>>> .properties(consumerProperties) >>>> ) >>>> .withFormat(new Json()) >>>> .withSchema(new Schema().schema( >>>> TableSchema.builder() >>>> .field("type", STRING()) >>>> .field("location", >>>> ROW( >>>> FIELD("id", STRING()), >>>> // (1) >>>> FIELD("lastUpdateTime", BIGINT()) >>>> // (2) >>>> FIELD("lastUpdateTime", TIMESTAMP()) >>>> // (3) >>>> FIELD("lastUpdateTime", >>>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)) >>>> )) >>>> .build() >>>> )) >>>> .createTemporaryTable("message"); >>>> tEnv.toAppendStream(tEnv.from("message"), Row.class) >>>> .print(); >>> >>> >>> Note that I tried BIGINT(), TIMESTAMP(), and >>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class). >>> (1) it works fine but later I can't use time-based operations like >>> windowing. >>> >>> (2) it causes the following exception >>> >>>> Exception in thread "main" >>>> org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, >>>> `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match >>>> with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of >>>> the 'location' field of the TableSource return type. >>>> at >>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166) >>>> at >>>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191) >>>> at >>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252) >>>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321) >>>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) >>>> at >>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) >>>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) >>>> at >>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) >>>> at >>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) >>>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) >>>> at >>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) >>>> at >>>> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234) >>>> at >>>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212) >>>> at >>>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116) >>>> at >>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212) >>>> at >>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107) >>>> at >>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62) >>>> at >>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>> at >>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62) >>>> at >>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184) >>>> at >>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153) >>>> at >>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>>> at >>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>>> at >>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48) >>>> at >>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) >>>> at >>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59) >>>> at >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>> at >>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >>>> at >>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59) >>>> at >>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153) >>>> at >>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351) >>>> at >>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259) >>>> at >>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250) >>>> at com.kakaomobility.mobdata.Finder.main(Finder.java:133) >>>> Caused by: org.apache.flink.table.api.ValidationException: Type >>>> ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' >>>> does not match with the physical type ROW<`id` STRING, `lastUpdateTime` >>>> TIMESTAMP(3)> of the 'location' field of the TableSource return type. >>>> at >>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166) >>>> at >>>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188) >>>> ... 38 more >>> >>> >>> (3) it causes the following exception >>> >>>> Caused by: java.time.format.DateTimeParseException: Text >>>> '1593868714814' could not be parsed at index 0 >>>> at >>>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) >>>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) >>>> ... 7 more >>> >>> >>> Can I read such json messages with time information in Flink 1.10.1? >>> >>> Thanks >>> >>> Dongwon >>> >>> >>> >