Hi team,
I'm investigating the Flink Batch mode using Table API. And I found the OVER
clause runnable in Steaming mode has some issues in batch mode. The stacktrace
is that the RangeBoundComparator class generated has compile issue.
I attached the complete error in the end. Look forward to get your response.
thanks!
-----------SQL Start-------------
CREATE TABLE input_table (
name STRING,
age INT,
address STRING,
eventTime BIGINT,
ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventTime / 1000,
'yyyy-MM-dd HH:mm:ss'))
) WITH (
'connector' = 'filesystem',
'path' = 'flink-batch-local-input-file.csv',
'format' = 'csv'
);
CREATE TABLE output_table (
name STRING,
age INT,
address STRING,
eventTime BIGINT,
ts TIMESTAMP(3),
num BIGINT
) WITH (
'connector' = 'filesystem',
'path' = '=flink-batch-local-output-file.csv',
'format' = 'csv'
);
INSERT INTO output_table
SELECT *, count(name) OVER w AS num FROM input_table
window w as (PARTITION BY `name` ORDER BY ts RANGE BETWEEN INTERVAL '1' HOUR
PRECEDING AND CURRENT ROW );
-----------SQL End-------------
-----------Error Start-------------
Caused by: java.lang.RuntimeException: Could not instantiate generated class
'RangeBoundComparator$48'
Caused by: org.apache.flink.util.FlinkRuntimeException:
org.apache.flink.api.common.InvalidProgramException: Table program cannot be
compiled. This is a bug. Please file an issue.
Caused by:
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.flink.api.common.InvalidProgramException: Table program cannot be
compiled. This is a bug. Please file an issue.
Caused by: org.codehaus.commons.compiler.CompileException: Line 45, Column 66:
Cannot cast "org.apache.flink.table.data.TimestampData" to "java.lang.Long"
-----------Error End-------------
Thanks,
Matthew