Hi Jingsong, Thanks for the reply! The following is our code snippet for creating the log stream. Our messages are in thrift format. We use a customized serializer for serializing/deserializing messages ( see https://github.com/apache/flink/pull/8067 for the implementation) . Given that, how shall we define a time attribute column? We'd like to leverage customized serializer to figure out column names as much as possible.
ThriftDeserializationSchema deserializationSchema = new ThriftDeserializationSchema(CustomerOrders.class, ThriftCodeGenerator.SCROOGE); FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer(“customer_orders”, deserializationSchema, m10n05Properties); tableEnv.registerDataStream(“orders”, kafkaConsumer); Regards, -Yu On Wed, Jun 5, 2019 at 11:15 PM JingsongLee <lzljs3620...@aliyun.com> wrote: > Hi @Yu Yang: > Time-based operations such as windows in both the Table API and SQL require > > information about the notion of time and its origin. Therefore, tables can > offer > > logical time attributes for indicating time and accessing corresponding > timestamps > in table programs.[1] > This mean Window can only be defined over a time attribute column. > You need define a rowtime in your source just like (UserActionTime is a > long field, you don't need convert it to Timestamp): > > Table table = tEnv.fromDataStream(stream, "Username, Data, > UserActionTime.rowtime"); > > See more information in below document: > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time > > Best, JingsongLee > > ------------------------------------------------------------------ > From:Yu Yang <yuyan...@gmail.com> > Send Time:2019年6月5日(星期三) 14:57 > To:user <user@flink.apache.org> > Subject:can flink sql handle udf-generated timestamp field > > Hi, > > I am trying to use Flink SQL to do aggregation on a hopping window. In the > data stream, we store the timestamp in long type. So I wrote a UDF > 'FROM_UNIXTIME' to convert long to Timestamp type. > > public static class TimestampModifier extends ScalarFunction { > public Timestamp eval(long t) { > return new Timestamp(t); > } > public TypeInformation<?> getResultType(Class<?>[] signature) { > return Types.SQL_TIMESTAMP; > } > } > > With the above UDF, I wrote the following query, and ran into > "ProgramInvocationException: The main method caused an error: Window can > only be defined over a time attribute column". > Any suggestions on how to resolve this issue? I am using Flink 1.8 for > this experiment. > > my sql query: > > select keyid, sum(value) > from ( > select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value > from orders) > group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid > > flink exception: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Window can only be defined over a time attribute > column. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: org.apache.flink.table.api.ValidationException: Window can only > be defined over a time attribute column. > at > org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85) > at > org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99) > at > org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66) > at > org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) > at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559) > at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418) > > Regards, > -Yu > > >