Hi Jingsong,

Thanks for the reply! The following is our code snippet for creating the
log stream.  Our messages are in thrift format. We use a customized
serializer for serializing/deserializing messages ( see
https://github.com/apache/flink/pull/8067 for the implementation) . Given
that, how shall we define a time attribute column?  We'd like to leverage
customized serializer to figure out column names as much as possible.

    ThriftDeserializationSchema deserializationSchema =
        new ThriftDeserializationSchema(CustomerOrders.class,
ThriftCodeGenerator.SCROOGE);

    FlinkKafkaConsumer kafkaConsumer =
        new FlinkKafkaConsumer(“customer_orders”, deserializationSchema,
m10n05Properties);

    tableEnv.registerDataStream(“orders”, kafkaConsumer);

Regards,
-Yu

On Wed, Jun 5, 2019 at 11:15 PM JingsongLee <lzljs3620...@aliyun.com> wrote:

> Hi @Yu Yang:
> Time-based operations such as windows in both the Table API and SQL require
>
>  information about the notion of time and its origin. Therefore, tables can 
> offer
>
>  logical time attributes for indicating time and accessing corresponding 
> timestamps
>  in table programs.[1]
> This mean Window can only be defined over a time attribute column.
> You need define a rowtime in your source just like (UserActionTime is a
> long field, you don't need convert it to Timestamp):
>
> Table table = tEnv.fromDataStream(stream, "Username, Data, 
> UserActionTime.rowtime");
>
> See more information in below document:
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/time_attributes.html#event-time
>
> Best, JingsongLee
>
> ------------------------------------------------------------------
> From:Yu Yang <yuyan...@gmail.com>
> Send Time:2019年6月5日(星期三) 14:57
> To:user <user@flink.apache.org>
> Subject:can flink sql handle udf-generated timestamp field
>
> Hi,
>
> I am trying to use Flink SQL to do aggregation on a hopping window. In the
> data stream, we store the timestamp in long type. So I wrote a UDF
> 'FROM_UNIXTIME' to convert long to Timestamp type.
>
>   public static class TimestampModifier extends ScalarFunction {
>     public Timestamp eval(long t) {
>       return new Timestamp(t);
>     }
>     public TypeInformation<?> getResultType(Class<?>[] signature) {
>       return Types.SQL_TIMESTAMP;
>     }
>   }
>
> With the above UDF, I wrote the following query, and ran into
>  "ProgramInvocationException: The main method caused an error: Window can
> only be defined over a time attribute column".
> Any suggestions on how to resolve this issue? I am using Flink 1.8 for
> this experiment.
>
> my sql query:
>
> select  keyid, sum(value)
> from (
>    select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
>    from orders)
>  group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid
>
> flink exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Window can only be defined over a time attribute
> column.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.table.api.ValidationException: Window can only
> be defined over a time attribute column.
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
> at
> org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
> at
> org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)
>
> Regards,
> -Yu
>
>
>

Reply via email to