Hi,
I am trying to use Flink SQL to do aggregation on a hopping window. In the
data stream, we store the timestamp in long type. So I wrote a UDF
'FROM_UNIXTIME' to convert long to Timestamp type.
public static class TimestampModifier extends ScalarFunction {
public Timestamp eval(long t) {
return new Timestamp(t);
}
public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.SQL_TIMESTAMP;
}
}
With the above UDF, I wrote the following query, and ran into
"ProgramInvocationException: The main method caused an error: Window can
only be defined over a time attribute column".
Any suggestions on how to resolve this issue? I am using Flink 1.8 for this
experiment.
my sql query:
select keyid, sum(value)
from (
select FROM_UNIXTIME(timestampMs) as ts, key.id as keyid, value
from orders)
group by HOP(ts, INTERVAL ‘10’ second, INTERVAL ‘30’ second), keyid
flink exception:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Window can only be defined over a time attribute column.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.table.api.ValidationException: Window can only
be defined over a time attribute column.
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.getOperandAsTimeIndicator$1(DataStreamLogicalWindowAggregateRule.scala:85)
at
org.apache.flink.table.plan.rules.datastream.DataStreamLogicalWindowAggregateRule.translateWindowExpression(DataStreamLogicalWindowAggregateRule.scala:99)
at
org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule.onMatch(LogicalWindowAggregateRule.scala:66)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:559)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:418)
Regards,
-Yu