[ https://issues.apache.org/jira/browse/FLINK-30511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
RocMarshal updated FLINK-30511: ------------------------------- Description: * Code segment: {code:java} public class OnTimerDemo { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString("taskmanager.numberOfTaskSlots", "4"); conf.setString("state.checkpoint-storage", "filesystem"); conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob"); conf.setString("execution.checkpointing.interval", "30s"); //conf.setString("execution.savepoint.path", "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A: StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); EnvironmentSettings envSetting = EnvironmentSettings .newInstance() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting); String sourceDDL = "CREATE TABLE orders (\n" + " id INT,\n" + " app INT,\n" + " user_id STRING" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.app.min'='1',\n" + " 'fields.app.max'='10',\n" + " 'fields.user_id.length'='10'\n" + ")"; tableEnv.executeSql(sourceDDL); Table query = tableEnv.sqlQuery("select * from orders"); DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, Row.class); TypeInformation<?>[] returnTypes = new TypeInformation[4]; returnTypes[0] = Types.INT; returnTypes[1] = Types.INT; // Anchor-B: returnTypes[2] = Types.INT; returnTypes[3] = Types.INT; rowDataStream.keyBy(new KeySelector<Row, String>() { @Override public String getKey(Row value) throws Exception { return value.getFieldAs(2); } }).process(new KeyedProcessFunction<String, Row, Row>() { private Row firstRow; @Override public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception { if (firstRow == null) { firstRow = value; } ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 3000); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Row> out) throws Exception { Row colRow = new Row(4); colRow.setField(0, 0); colRow.setField(1, 1); colRow.setField(2, 2); colRow.setField(3, 3); out.collect(colRow); // Anchor-C } }).name("TargetTestUDF") .returns(new RowTypeInfo(returnTypes)) .print(); env.execute(OnTimerDemo.class.getSimpleName()); } } {code} * Recurrence steps ** Run the job without state. ** Collect the latest available checkpoint path as 'checkpoint-path-a' ** Stop the job. ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and un-comment the line. ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 'Anchor-B' line. ** Then add break-point at 'StreamTask#handleAsyncException' method. ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at 'StreamTask#handleAsyncException' method. ** So, The framework can't catch the same exception in the case. * Root cause: ** !截屏2022-12-27 18.51.12.png! ** When job started from state data, the Task#restoreAndInvoke would be called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' was ignored at the above 'handleAsyncException' method instead of catching at catch-block of 'Task#restoreAndInvoke'. !截屏2022-12-27 19.20.00.png! Could it be seen as the framework's missing handling of exceptions? If so, I prefer to re-throw the exception at 'StreamTask#handleAsyncException', which is suitable for the intention of the 'Task#restoreAndInvoke'. Thank u. was: * Code segment: {code:java} public class OnTimerDemo { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setString("taskmanager.numberOfTaskSlots", "4"); conf.setString("state.checkpoint-storage", "filesystem"); conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob"); conf.setString("execution.checkpointing.interval", "30s"); //conf.setString("execution.savepoint.path", "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A: StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); EnvironmentSettings envSetting = EnvironmentSettings .newInstance() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, envSetting); String sourceDDL = "CREATE TABLE orders (\n" + " id INT,\n" + " app INT,\n" + " user_id STRING" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.app.min'='1',\n" + " 'fields.app.max'='10',\n" + " 'fields.user_id.length'='10'\n" + ")"; tableEnv.executeSql(sourceDDL); Table query = tableEnv.sqlQuery("select * from orders"); DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, Row.class); TypeInformation<?>[] returnTypes = new TypeInformation[4]; returnTypes[0] = Types.INT; returnTypes[1] = Types.INT; // Anchor-B: returnTypes[2] = Types.INT; returnTypes[3] = Types.INT; rowDataStream.keyBy(new KeySelector<Row, String>() { @Override public String getKey(Row value) throws Exception { return value.getFieldAs(2); } }).process(new KeyedProcessFunction<String, Row, Row>() { private Row firstRow; @Override public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception { if (firstRow == null) { firstRow = value; } ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 3000); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Row> out) throws Exception { Row colRow = new Row(4); colRow.setField(0, 0); colRow.setField(1, 1); colRow.setField(2, 2); colRow.setField(3, 3); out.collect(colRow); // Anchor-C } }).name("TargetTestUDF") .returns(new RowTypeInfo(returnTypes)) .print(); env.execute(OnTimerDemo.class.getSimpleName()); } } {code} * Recurrence steps ** Run the job without state. ** Collect the latest available checkpoint path as 'checkpoint-path-a' ** Stop the job. ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and un-comment the line. ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 'Anchor-B' line. ** Then add break-point at 'StreamTask#handleAsyncException' method. ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at 'StreamTask#handleAsyncException' method. ** So, The framework can't catch the same exception in the case. * Root cause: ** !截屏2022-12-27 18.51.12.png! ** When job started from state data, the Task#restoreAndInvoke would be called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' was ignored at the above 'handleAsyncException' method instead of catching at catch-block of 'Task#restoreAndInvoke'. !截屏2022-12-27 19.20.00.png! Could it be set as the framework's missing handling of exceptions? If so, I prefer to re-throw the exception at 'StreamTask#handleAsyncException', which is suitable for the intention of the 'Task#restoreAndInvoke'. Thank u. > Ignore the Exception in user-timer Triggerble when recover form state. > ---------------------------------------------------------------------- > > Key: FLINK-30511 > URL: https://issues.apache.org/jira/browse/FLINK-30511 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.16.0 > Environment: Flink 1.16.0 > java8 > deployment Mode: miniCluster in IDC; standalone, yarn-application. > Reporter: RocMarshal > Priority: Minor > Attachments: 截屏2022-12-27 18.51.12.png, 截屏2022-12-27 19.20.00.png > > > * Code segment: > {code:java} > public class OnTimerDemo { > public static void main(String[] args) throws Exception { > Configuration conf = new Configuration(); > conf.setString("taskmanager.numberOfTaskSlots", "4"); > conf.setString("state.checkpoint-storage", "filesystem"); > conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob"); > conf.setString("execution.checkpointing.interval", "30s"); > //conf.setString("execution.savepoint.path", > "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A: > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); > env.setParallelism(1); > EnvironmentSettings envSetting = EnvironmentSettings > .newInstance() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > envSetting); > String sourceDDL = "CREATE TABLE orders (\n" + > " id INT,\n" + > " app INT,\n" + > " user_id STRING" + > ") WITH (\n" + > " 'connector' = 'datagen',\n" + > " 'rows-per-second'='1',\n" + > " 'fields.app.min'='1',\n" + > " 'fields.app.max'='10',\n" + > " 'fields.user_id.length'='10'\n" + > ")"; > tableEnv.executeSql(sourceDDL); > Table query = tableEnv.sqlQuery("select * from orders"); > DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, > Row.class); > TypeInformation<?>[] returnTypes = new TypeInformation[4]; > returnTypes[0] = Types.INT; > returnTypes[1] = Types.INT; // Anchor-B: > returnTypes[2] = Types.INT; > returnTypes[3] = Types.INT; > rowDataStream.keyBy(new KeySelector<Row, String>() { > @Override > public String getKey(Row value) throws Exception { > return value.getFieldAs(2); > } > }).process(new KeyedProcessFunction<String, Row, Row>() { > private Row firstRow; > @Override > public void processElement(Row value, Context ctx, > Collector<Row> out) throws Exception { > if (firstRow == null) { > firstRow = value; > } > > ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + > 3000); > } > @Override > public void onTimer(long timestamp, OnTimerContext ctx, > Collector<Row> out) throws Exception { > Row colRow = new Row(4); > colRow.setField(0, 0); > colRow.setField(1, 1); > colRow.setField(2, 2); > colRow.setField(3, 3); > out.collect(colRow); // Anchor-C > } > }).name("TargetTestUDF") > .returns(new RowTypeInfo(returnTypes)) > .print(); > env.execute(OnTimerDemo.class.getSimpleName()); > } > } > {code} > * Recurrence steps > ** Run the job without state. > ** Collect the latest available checkpoint path as 'checkpoint-path-a' > ** Stop the job. > ** Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and > un-comment the line. > ** Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at > the 'Anchor-B' line. > ** Then add break-point at 'StreamTask#handleAsyncException' method. > ** Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot > be cast to java.lang.Long' exception caused at the 'Anchor-C' line will > ignore at 'StreamTask#handleAsyncException' method. > ** So, The framework can't catch the same exception in the case. > * Root cause: > ** !截屏2022-12-27 18.51.12.png! > ** When job started from state data, the Task#restoreAndInvoke would be > called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot > be cast to java.lang.Long' was ignored at the above 'handleAsyncException' > method instead of catching at catch-block of 'Task#restoreAndInvoke'. > !截屏2022-12-27 19.20.00.png! > Could it be seen as the framework's missing handling of exceptions? > If so, I prefer to re-throw the exception at > 'StreamTask#handleAsyncException', which is suitable for the intention of the > 'Task#restoreAndInvoke'. > Thank u. > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)