twalthr commented on code in PR #25357: URL: https://github.com/apache/flink/pull/25357#discussion_r1768266516
########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java: ########## @@ -144,7 +150,42 @@ private Stream<Arguments> createSpecs() { return getAllMetadata().stream() .flatMap( metadata -> - supportedPrograms().stream().map(p -> Arguments.of(p, metadata))); + supportedPrograms().stream() + .flatMap( + program -> + getSavepointPaths(program, metadata) + .map( + savepointPath -> + Arguments.of( + program, + getPlanPath( + program, + metadata), + savepointPath)))); + } + + /** + * The method can be overridden in a subclass to test multiple savepoint files for a given + * program and a node in a particular version. This can be useful e.g. to test a node against + * savepoint generated in different Flink versions. + */ + protected Stream<String> getSavepointPaths( + TableTestProgram program, ExecNodeMetadata metadata) { + return Stream.of(getSavepointPath(program, metadata, null).toString()); + } + + protected final Path getSavepointPath( Review Comment: nit: return `String` to match with `getSavepointPaths`. also avoid the additional toString in the `OverWindowRestoreTest ` ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/RestoreTestBase.java: ########## @@ -144,7 +150,42 @@ private Stream<Arguments> createSpecs() { return getAllMetadata().stream() .flatMap( metadata -> - supportedPrograms().stream().map(p -> Arguments.of(p, metadata))); + supportedPrograms().stream() + .flatMap( + program -> + getSavepointPaths(program, metadata) + .map( + savepointPath -> + Arguments.of( + program, + getPlanPath( + program, + metadata), + savepointPath)))); + } + + /** + * The method can be overridden in a subclass to test multiple savepoint files for a given + * program and a node in a particular version. This can be useful e.g. to test a node against + * savepoint generated in different Flink versions. + */ + protected Stream<String> getSavepointPaths( + TableTestProgram program, ExecNodeMetadata metadata) { + return Stream.of(getSavepointPath(program, metadata, null).toString()); + } + + protected final Path getSavepointPath( Review Comment: the `Paths.get` can be done in `generateTestSetupFiles ` ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java: ########## @@ -314,7 +314,8 @@ public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCode types = new LogicalType[length]; for (int i = 0; i < length; i++) { try { - types[i] = InstantiationUtil.deserializeObject(stream, userCodeClassLoader); + types[i] = + InstantiationUtil.deserializeObject(stream, userCodeClassLoader, true); Review Comment: Is RowDataSerializer enough? Wouldn't it be safer to do it for all constructed types? So MapDataSerializer and ArrayDataSerializer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org