Best,
Danny Chan
---------- 转发信息 ----------
发件人: Danny Chan <yuzhao....@gmail.com>
日期: 2020年7月20日 +0800 PM4:51
收件人: Dongwon Kim <eastcirc...@gmail.com>
主题: Re: [Table API] how to configure a nested timestamp field

> Or is it possible you pre-define a catalog there and register through the SQL 
> CLI yaml ?
>
> Best,
> Danny Chan
> 在 2020年7月20日 +0800 PM3:23,Dongwon Kim <eastcirc...@gmail.com>,写道:
> > Hi Leonard,
> >
> > > Unfortunately the answer is no, the YAML you defined will parse by Table 
> > > API and then execute, the root cause of your post error is Table API does 
> > > not support computed column now,
> > > there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, 
> > > I think DDL is recommended way since FLINK 1.11.0.
> > Okay, thanks a lot for your input.
> >
> > I just tried out Flink SQL client and wanted to store pre-defined YAML 
> > files each declaring a source table from a Kafka topic.
> > As you advised, I have to manually enter DDL in the SQL client on FLINK 
> > 1.11.x
> >
> > Best,
> >
> > Dongwon
> >
> >
> > > On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu <xbjt...@gmail.com> wrote:
> > > > Hi, Kim
> > > >
> > > > > Hi Leonard,
> > > > >
> > > > > Can I have a YAML definition corresponding to the DDL you suggested?
> > > >
> > > > Unfortunately the answer is no, the YAML you defined will parse by 
> > > > Table API and then execute, the root cause of your post error is Table 
> > > > API does not support computed column now,
> > > >
> > > > there is a FLIP under discussion[1], this should be ready in 1.12.0. 
> > > > BTW, I think DDL is recommended way since FLINK 1.11.0.
> > > >
> > > > Best,
> > > > Leonard Xu
> > > > [1] 
> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
> > > >
> > > > > 在 2020年7月20日,14:30,Dongwon Kim <eastcirc...@gmail.com> 写道:
> > > > >
> > > > >
> > > > > I tried below (Flink 1.11.0) but got some error:
> > > > > > tables:
> > > > > >   - name: test
> > > > > >     type: source-table
> > > > > >     update-mode: append
> > > > > >     connector:
> > > > > >       property-version: 1
> > > > > >       type: kafka
> > > > > >       version: universal
> > > > > >       topic: ...
> > > > > >       properties:
> > > > > >         bootstrap.servers: ...
> > > > > >         group.id: ...
> > > > > >     format:
> > > > > >       property-version: 1
> > > > > >       type: json
> > > > > >     schema:
> > > > > >       - name: type
> > > > > >         data-type: STRING
> > > > > >       - name: location
> > > > > >         data-type: >
> > > > > >           ROW<
> > > > > >             id STRING,
> > > > > >             lastUpdateTime BIGINT
> > > > > >           >
> > > > > >       - name: timestampCol
> > > > > >         data-type: TIMESTAMP(3)
> > > > > >         rowtime:
> > > > > >           timestamps:
> > > > > >             type: from-field
> > > > > >             from: 
> > > > > > TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 
> > > > > > 'yyyy-MM-dd HH:mm:ss'))
> > > > > >           watermarks:
> > > > > >             type: periodic-bounded
> > > > > >             delay: 5000
> > > > >
> > > > > SQL client doesn't complain about the file but, when I execute 
> > > > > "SELECT timestampCol from test", the job fails with the following 
> > > > > error message:
> > > > > > Caused by: java.lang.NullPointerException
> > > > > > at 
> > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> > > > > > at 
> > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> > > > > > at 
> > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> > > > > > at 
> > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> > > > > > at SourceConversion$4.processElement(Unknown Source)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> > > > > > at 
> > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> > > > > > at 
> > > > > > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> > > > > > at 
> > > > > > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> > > > > > at 
> > > > > > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> > > > > > at 
> > > > > > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> > > > > > at 
> > > > > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> > > > > > at 
> > > > > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> > > > > > at 
> > > > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> > > > > > at 
> > > > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> > > > > > at 
> > > > > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> > > > > > at 
> > > > > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> > > > >
> > > > > > On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <eastcirc...@gmail.com> 
> > > > > > wrote:
> > > > > > > Hi Leonard,
> > > > > > >
> > > > > > > Wow, that's great! It works like a charm.
> > > > > > > I've never considered this approach at all.
> > > > > > > Thanks a lot.
> > > > > > >
> > > > > > > Best,
> > > > > > > Dongwon
> > > > > > >
> > > > > > > > On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xbjt...@gmail.com> 
> > > > > > > > wrote:
> > > > > > > > > Hi, Kim
> > > > > > > > >
> > > > > > > > > The reason your attempts (2) and (3) failed is that the json 
> > > > > > > > > format does not support convert a BIGINT to TIMESTAMP, you 
> > > > > > > > > can first define the BIGINT field and then use a computed 
> > > > > > > > > column to extract TIMESTAMP field, you can also define the 
> > > > > > > > > time attribute on TIMESTAMP filed for using time-based 
> > > > > > > > > operations in Flink 1.10.1. But the computed column only 
> > > > > > > > > support in pure DDL, the Table API lacks the support and 
> > > > > > > > > should be aligned in 1.12 as I know.
> > > > > > > > > The DDL syntax  as following:
> > > > > > > > >
> > > > > > > > > create table test (
> > > > > > > > >   `type` STRING,
> > > > > > > > >   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
> > > > > > > > >    timestampCol as 
> > > > > > > > > TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 
> > > > > > > > > 'yyyy-MM-dd HH:mm:ss')), —computed column
> > > > > > > > >    WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' 
> > > > > > > > > SECOND
> > > > > > > > > )   with (
> > > > > > > > >   'connector' = '...',
> > > > > > > > >   'format' = 'json',
> > > > > > > > >   ...
> > > > > > > > > );
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Leonard Xu
> > > > > > > > > [1] 
> > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > 在 2020年7月4日,21:21,Dongwon Kim <eastcirc...@gmail.com> 写道:
> > > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > > I use Flink 1.10.1 and I want to use Table API to read JSON 
> > > > > > > > > > messages. The message looks like below.
> > > > > > > > > > >     {
> > > > > > > > > > >        "type":"Update",
> > > > > > > > > > >        "location":{
> > > > > > > > > > >           "id":"123e4567-e89b-12d3-a456-426652340000",
> > > > > > > > > > >           "lastUpdateTime":1593866161436
> > > > > > > > > > >        }
> > > > > > > > > > >     }
> > > > > > > > > >
> > > > > > > > > > I wrote the following program just to see whether json 
> > > > > > > > > > messages are correctly parsed by Table API:
> > > > > > > > > > >     StreamExecutionEnvironment env = 
> > > > > > > > > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > > > > > > >     
> > > > > > > > > > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > > > > > > > > > >     EnvironmentSettings envSettings = 
> > > > > > > > > > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > > > > > > > > > >     StreamTableEnvironment tEnv = 
> > > > > > > > > > > StreamTableEnvironment.create(env, envSettings);
> > > > > > > > > > >     tEnv
> > > > > > > > > > >       .connect(
> > > > > > > > > > >         new Kafka()
> > > > > > > > > > >           .version("universal")
> > > > > > > > > > >           .topic(consumerTopic)
> > > > > > > > > > >           .startFromLatest()
> > > > > > > > > > >           .properties(consumerProperties)
> > > > > > > > > > >       )
> > > > > > > > > > >       .withFormat(new Json())
> > > > > > > > > > >       .withSchema(new Schema().schema(
> > > > > > > > > > >         TableSchema.builder()
> > > > > > > > > > >           .field("type", STRING())
> > > > > > > > > > >           .field("location",
> > > > > > > > > > >             ROW(
> > > > > > > > > > >               FIELD("id", STRING()),
> > > > > > > > > > >               // (1)
> > > > > > > > > > >               FIELD("lastUpdateTime", BIGINT())
> > > > > > > > > > >               // (2)
> > > > > > > > > > >               FIELD("lastUpdateTime", TIMESTAMP())
> > > > > > > > > > >               // (3)
> > > > > > > > > > >               FIELD("lastUpdateTime", 
> > > > > > > > > > > TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
> > > > > > > > > > >             ))
> > > > > > > > > > >           .build()
> > > > > > > > > > >       ))
> > > > > > > > > > >       .createTemporaryTable("message");
> > > > > > > > > > >     tEnv.toAppendStream(tEnv.from("message"), Row.class)
> > > > > > > > > > >       .print();
> > > > > > > > > >
> > > > > > > > > > Note that I tried BIGINT(), TIMESTAMP(), and 
> > > > > > > > > > TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
> > > > > > > > > > (1) it works fine but later I can't use time-based 
> > > > > > > > > > operations like windowing.
> > > > > > > > > >
> > > > > > > > > > (2) it causes the following exception
> > > > > > > > > > > Exception in thread "main" 
> > > > > > > > > > > org.apache.flink.table.api.ValidationException: Type 
> > > > > > > > > > > ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table 
> > > > > > > > > > > field 'location' does not match with the physical type 
> > > > > > > > > > > ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 
> > > > > > > > > > > 'location' field of the TableSource return type.
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
> > > > > > > > > > > at 
> > > > > > > > > > > java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> > > > > > > > > > > at 
> > > > > > > > > > > java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> > > > > > > > > > > at 
> > > > > > > > > > > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> > > > > > > > > > > at 
> > > > > > > > > > > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> > > > > > > > > > > at 
> > > > > > > > > > > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> > > > > > > > > > > at 
> > > > > > > > > > > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> > > > > > > > > > > at 
> > > > > > > > > > > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> > > > > > > > > > > at 
> > > > > > > > > > > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> > > > > > > > > > > at 
> > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > > > > > > > > at 
> > > > > > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > > > > > > > > at 
> > > > > > > > > > > scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > > > > > > > > > > at 
> > > > > > > > > > > scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > > > > > > > > > > at 
> > > > > > > > > > > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > > > > > > > > > > at 
> > > > > > > > > > > scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > > > > > > > > > > at 
> > > > > > > > > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > > > > > > > > > > at 
> > > > > > > > > > > scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
> > > > > > > > > > > at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
> > > > > > > > > > > Caused by: 
> > > > > > > > > > > org.apache.flink.table.api.ValidationException: Type 
> > > > > > > > > > > ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table 
> > > > > > > > > > > field 'location' does not match with the physical type 
> > > > > > > > > > > ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 
> > > > > > > > > > > 'location' field of the TableSource return type.
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
> > > > > > > > > > > ... 38 more
> > > > > > > > > >
> > > > > > > > > > (3) it causes the following exception
> > > > > > > > > > > Caused by: java.time.format.DateTimeParseException: Text 
> > > > > > > > > > > '1593868714814' could not be parsed at index 0
> > > > > > > > > > > at 
> > > > > > > > > > > java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> > > > > > > > > > > at 
> > > > > > > > > > > java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> > > > > > > > > > > at 
> > > > > > > > > > > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> > > > > > > > > > > ... 7 more
> > > > > > > > > >
> > > > > > > > > > Can I read such json messages with time information in 
> > > > > > > > > > Flink 1.10.1?
> > > > > > > > > >
> > > > > > > > > > Thanks
> > > > > > > > > >
> > > > > > > > > > Dongwon
> > > > > > > > >
> > > >

Reply via email to