Hi Yu, When you register a DataStream as a Table, you can create a new attribute that contains the event timestamp of the DataStream records. For that, you would need to assign timestamps and generate watermarks before registering the stream:
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer(“customer_orders”, deserializationSchema, m10n05Properties); // create DataStream from Kafka consumer DataStream<CustomerOrders> orders = env.addSource(kafkaConsumer); // assign timestamps with a custom timestamp assigner & WM generator DataStream<CustomerOrders> ordersWithTS = orders.assignTimestampsAndWatermarks(new YourTimestampAssigner()); // register DataStream as Table with ts as timestamp which is automatically extracted (see [1] for how to map POJO fields and [2] for timestamps) tableEnv.registerDataStream("custom_orders", ordersWithTS, "userName, ..., ts.rowtime"); Hope this helps, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#mapping-of-data-types-to-table-schema [2] https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1 Am Do., 6. Juni 2019 um 08:48 Uhr schrieb Yu Yang <yuyan...@gmail.com>: > Hi Jingsong, > > Thanks for the reply! The following is our code snippet for creating the > log stream. Our messages are in thrift format. We use a customized > serializer for serializing/deserializing messages ( see > https://github.com/apache/flink/pull/8067 for the implementation) . Given > that, how shall we define a time attribute column? We'd like to leverage > customized serializer to figure out column names as much as possible. > > ThriftDeserializationSchema deserializationSchema = > new ThriftDeserializationSchema(CustomerOrders.class, > ThriftCodeGenerator.SCROOGE); > > FlinkKafkaConsumer kafkaConsumer = > new FlinkKafkaConsumer(“customer_orders”, deserializationSchema, > m10n05Properties); > > tableEnv.registerDataStream(“orders”, kafkaConsumer); > > Regards, > -Yu > > On Wed, Jun 5, 2019 at 11:15 PM JingsongLee <lzljs3620...@aliyun.com> > wrote: > >> Hi @Yu Yang: >> >> Time-based operations such as windows in both the Table API and SQL require >> >> information about the notion of time and its origin. Therefore, tables can >> offer >> >> logical time attributes for indicating time and accessing corresponding >> timestamps >> in table programs.[1] >> This mean Window can only be defined over a time attribute column. >> You need define a rowtime in your source just like (UserActionTime is a >> long field, you don't need convert it to Timestamp): >> >> Table table = tEnv.fromDataStream(stream, "Username, Data, >> UserActionTime.rowtime"); >> >> See more information in below document: >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time >> >> Best, JingsongLee >> >> ------------------------------------------------------------------ >> From:Yu Yang <yuyan...@gmail.com> >> Send Time:2019年6月5日(星期三) 14:57 >> To:user <user@flink.apache.org> >> Subject:can flink sql handle udf-generated timestamp field >> >> Hi, >> >> I am trying to use Flink SQL to do aggregation on a hopping window. In >> the data stream, we store the timestamp in long type. So I wrote a UDF >> 'FROM_UNIXTIME' to convert long to Timestamp type. >> >> public static class TimestampModifier extends ScalarFunction { >> public Timestamp eval(long t) { >> return new Timestamp(t); >> } >> public TypeInformation<?> getResultType(Class<?>[] signature) { >> return Types.SQL_TIMESTAMP; >> } >> } >> >> With the above UDF, I wrote the following query, and ran into >> "ProgramInvocationException: The main method caused an error: Window can >> only be defined over a time attribute column". >> Any suggestions on how to resolve this issue? I am using Flink 1.8 for >> this experiment. >> >> my sql query: >> >> select keyid, sum(value) >> from ( >> select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value >> from orders) >> group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid >> >> flink exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: Window can only be defined over a time attribute >> column. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) >> at >> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) >> at >> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) >> Caused by: org.apache.flink.table.api.ValidationException: Window can >> only be defined over a time attribute column. >> at >> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85) >> at >> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99) >> at >> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66) >> at >> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) >> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559) >> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418) >> >> Regards, >> -Yu >> >> >>