Hi Leonard,

Unfortunately the answer is no, the YAML you defined will parse by Table
> API and then execute, the root cause of your post error is Table API does
> not support computed column now,
> there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW,
> I think DDL is recommended way since FLINK 1.11.0.

Okay, thanks a lot for your input.

I just tried out Flink SQL client and wanted to store pre-defined YAML
files each declaring a source table from a Kafka topic.
As you advised, I have to manually enter DDL in the SQL client on FLINK
1.11.x

Best,

Dongwon


On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu <xbjt...@gmail.com> wrote:

> Hi, Kim
>
> Hi Leonard,
>
> Can I have a YAML definition corresponding to the DDL you suggested?
>
>
> Unfortunately the answer is no, the YAML you defined will parse by Table
> API and then execute, the root cause of your post error is Table API does
> not support computed column now,
>
> there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW,
> I think DDL is recommended way since FLINK 1.11.0.
>
> Best,
> Leonard Xu
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-129:+Refactor+Descriptor+API+to+register+connectors+in+Table+API>
>
> 在 2020年7月20日,14:30,Dongwon Kim <eastcirc...@gmail.com> 写道:
>
>
> I tried below (Flink 1.11.0) but got some error:
>
>> tables:
>>   - name: test
>>     type: source-table
>>     update-mode: append
>>     connector:
>>       property-version: 1
>>       type: kafka
>>       version: universal
>>       topic: ...
>>       properties:
>>         bootstrap.servers: ...
>>         group.id: ...
>>     format:
>>       property-version: 1
>>       type: json
>>     schema:
>>       - name: type
>>         data-type: STRING
>>       - name: location
>>         data-type: >
>>           ROW<
>>             id STRING,
>>             lastUpdateTime BIGINT
>>           >
>>       - name: timestampCol
>>         data-type: TIMESTAMP(3)
>>         rowtime:
>>           timestamps:
>>             type: from-field
>>             from:
>> TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 'yyyy-MM-dd
>> HH:mm:ss'))
>>           watermarks:
>>             type: periodic-bounded
>>             delay: 5000
>>
>
> SQL client doesn't complain about the file but, when I execute "SELECT
> timestampCol from test", the job fails with the following error message:
>
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
>> at
>> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at SourceConversion$4.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
>
> On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
>
>> Hi Leonard,
>>
>> Wow, that's great! It works like a charm.
>> I've never considered this approach at all.
>> Thanks a lot.
>>
>> Best,
>> Dongwon
>>
>> On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xbjt...@gmail.com> wrote:
>>
>>> Hi, Kim
>>>
>>> The reason your attempts (2) and (3) failed is that the json format does
>>> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT
>>> field and then use a computed column to extract TIMESTAMP field, you can
>>> also define the time attribute on TIMESTAMP filed for using time-based
>>> operations in Flink 1.10.1. But the computed column only support in pure
>>> DDL, the Table API lacks the support and should be aligned in 1.12 as I
>>> know.
>>> The DDL syntax  as following:
>>>
>>> create table test (
>>>   `type` STRING,
>>>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>>>    timestampCol as
>>> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd
>>> HH:mm:ss')), —computed column
>>>    WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
>>> )   with (
>>>   'connector' = '...',
>>>   'format' = 'json',
>>>   ...
>>> );
>>>
>>>
>>> Best,
>>> Leonard Xu
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
>>>
>>>
>>> 在 2020年7月4日,21:21,Dongwon Kim <eastcirc...@gmail.com> 写道:
>>>
>>> Hi,
>>> I use Flink 1.10.1 and I want to use Table API to read JSON messages.
>>> The message looks like below.
>>>
>>>>     {
>>>>        "type":"Update",
>>>>        "location":{
>>>>           "id":"123e4567-e89b-12d3-a456-426652340000",
>>>>           "lastUpdateTime":1593866161436
>>>>        }
>>>>     }
>>>
>>>
>>> I wrote the following program just to see whether json messages are
>>> correctly parsed by Table API:
>>>
>>>>     StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>     EnvironmentSettings envSettings =
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>>>> envSettings);
>>>>     tEnv
>>>>       .connect(
>>>>         new Kafka()
>>>>           .version("universal")
>>>>           .topic(consumerTopic)
>>>>           .startFromLatest()
>>>>           .properties(consumerProperties)
>>>>       )
>>>>       .withFormat(new Json())
>>>>       .withSchema(new Schema().schema(
>>>>         TableSchema.builder()
>>>>           .field("type", STRING())
>>>>           .field("location",
>>>>             ROW(
>>>>               FIELD("id", STRING()),
>>>>               // (1)
>>>>               FIELD("lastUpdateTime", BIGINT())
>>>>               // (2)
>>>>               FIELD("lastUpdateTime", TIMESTAMP())
>>>>               // (3)
>>>>               FIELD("lastUpdateTime",
>>>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>>>>             ))
>>>>           .build()
>>>>       ))
>>>>       .createTemporaryTable("message");
>>>>     tEnv.toAppendStream(tEnv.from("message"), Row.class)
>>>>       .print();
>>>
>>>
>>> Note that I tried BIGINT(), TIMESTAMP(), and
>>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
>>> (1) it works fine but later I can't use time-based operations like
>>> windowing.
>>>
>>> (2) it causes the following exception
>>>
>>>> Exception in thread "main"
>>>> org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING,
>>>> `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match
>>>> with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of
>>>> the 'location' field of the TableSource return type.
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
>>>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>>>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>>>> at
>>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>>> at
>>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>>> at
>>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>> at
>>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>> at
>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>> at
>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>> at
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>> at
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>> at
>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>> at
>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>> at
>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>> at
>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>> at
>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>> at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
>>>> Caused by: org.apache.flink.table.api.ValidationException: Type
>>>> ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location'
>>>> does not match with the physical type ROW<`id` STRING, `lastUpdateTime`
>>>> TIMESTAMP(3)> of the 'location' field of the TableSource return type.
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
>>>> ... 38 more
>>>
>>>
>>> (3) it causes the following exception
>>>
>>>> Caused by: java.time.format.DateTimeParseException: Text
>>>> '1593868714814' could not be parsed at index 0
>>>> at
>>>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>>>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>>>> ... 7 more
>>>
>>>
>>> Can I read such json messages with time information in Flink 1.10.1?
>>>
>>> Thanks
>>>
>>> Dongwon
>>>
>>>
>>>
>

Reply via email to