Thank you Fabian! We will try the approach that you suggest. On Thu, Jun 6, 2019 at 1:03 AM Fabian Hueske <fhue...@gmail.com> wrote:
> Hi Yu, > > When you register a DataStream as a Table, you can create a new attribute > that contains the event timestamp of the DataStream records. > For that, you would need to assign timestamps and generate watermarks > before registering the stream: > > FlinkKafkaConsumer kafkaConsumer = > new FlinkKafkaConsumer(“customer_orders”, deserializationSchema, > m10n05Properties); > > // create DataStream from Kafka consumer > DataStream<CustomerOrders> orders = env.addSource(kafkaConsumer); > // assign timestamps with a custom timestamp assigner & WM generator > DataStream<CustomerOrders> ordersWithTS = > orders.assignTimestampsAndWatermarks(new YourTimestampAssigner()); > > // register DataStream as Table with ts as timestamp which is > automatically extracted (see [1] for how to map POJO fields and [2] for > timestamps) > tableEnv.registerDataStream("custom_orders", ordersWithTS, "userName, ..., > ts.rowtime"); > > Hope this helps, > Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/common.html#mapping-of-data-types-to-table-schema > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion-1 > > Am Do., 6. Juni 2019 um 08:48 Uhr schrieb Yu Yang <yuyan...@gmail.com>: > >> Hi Jingsong, >> >> Thanks for the reply! The following is our code snippet for creating the >> log stream. Our messages are in thrift format. We use a customized >> serializer for serializing/deserializing messages ( see >> https://github.com/apache/flink/pull/8067 for the implementation) . >> Given that, how shall we define a time attribute column? We'd like to >> leverage customized serializer to figure out column names as much as >> possible. >> >> ThriftDeserializationSchema deserializationSchema = >> new ThriftDeserializationSchema(CustomerOrders.class, >> ThriftCodeGenerator.SCROOGE); >> >> FlinkKafkaConsumer kafkaConsumer = >> new FlinkKafkaConsumer(“customer_orders”, deserializationSchema, >> m10n05Properties); >> >> tableEnv.registerDataStream(“orders”, kafkaConsumer); >> >> Regards, >> -Yu >> >> On Wed, Jun 5, 2019 at 11:15 PM JingsongLee <lzljs3620...@aliyun.com> >> wrote: >> >>> Hi @Yu Yang: >>> >>> Time-based operations such as windows in both the Table API and SQL require >>> >>> information about the notion of time and its origin. Therefore, tables can >>> offer >>> >>> logical time attributes for indicating time and accessing corresponding >>> timestamps >>> in table programs.[1] >>> This mean Window can only be defined over a time attribute column. >>> You need define a rowtime in your source just like (UserActionTime is a >>> long field, you don't need convert it to Timestamp): >>> >>> Table table = tEnv.fromDataStream(stream, "Username, Data, >>> UserActionTime.rowtime"); >>> >>> See more information in below document: >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time >>> >>> Best, JingsongLee >>> >>> ------------------------------------------------------------------ >>> From:Yu Yang <yuyan...@gmail.com> >>> Send Time:2019年6月5日(星期三) 14:57 >>> To:user <user@flink.apache.org> >>> Subject:can flink sql handle udf-generated timestamp field >>> >>> Hi, >>> >>> I am trying to use Flink SQL to do aggregation on a hopping window. In >>> the data stream, we store the timestamp in long type. So I wrote a UDF >>> 'FROM_UNIXTIME' to convert long to Timestamp type. >>> >>> public static class TimestampModifier extends ScalarFunction { >>> public Timestamp eval(long t) { >>> return new Timestamp(t); >>> } >>> public TypeInformation<?> getResultType(Class<?>[] signature) { >>> return Types.SQL_TIMESTAMP; >>> } >>> } >>> >>> With the above UDF, I wrote the following query, and ran into >>> "ProgramInvocationException: The main method caused an error: Window can >>> only be defined over a time attribute column". >>> Any suggestions on how to resolve this issue? I am using Flink 1.8 for >>> this experiment. >>> >>> my sql query: >>> >>> select keyid, sum(value) >>> from ( >>> select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value >>> from orders) >>> group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid >>> >>> flink exception: >>> >>> org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error: Window can only be defined over a time attribute >>> column. >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423) >>> at >>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) >>> at >>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) >>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) >>> at >>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) >>> at >>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) >>> at >>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) >>> Caused by: org.apache.flink.table.api.ValidationException: Window can >>> only be defined over a time attribute column. >>> at >>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85) >>> at >>> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99) >>> at >>> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66) >>> at >>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) >>> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559) >>> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418) >>> >>> Regards, >>> -Yu >>> >>> >>>