Dear Apache Flink community,
I could use some help with a serialization issue I'm having while using the
Table API. Specifically, I'm trying to deserialize a serialized
CompiledPlan, but I'm running into trouble with the UNNEST_ROWS operation.
It seems that the CompilePlan deserializer isn't looking up any functions
in the BuiltInFunctionDefinitions class, which is causing the
de-serialization to fail.
Do any of you have experience with this issue or know of a workaround for
serializing a Table API plan?
Below is code to replicate.
Thanks,
Daniel Henneberger
private void test() {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// Create a table of values
Table table = tEnv.fromValues(createNestedDatatype(),
Row.of(List.of(Row.of("nested")), "name"));
tEnv.createTemporaryView("table1", table);
// Invoke the unnest operation
Table unnested = tEnv.sqlQuery("SELECT name, nested\n"
+ "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)");
StatementSet statementSet = tEnv.createStatementSet();
statementSet.addInsert(TableDescriptor.forConnector("print").build(),
unnested);
// Serialize the plan
CompiledPlan plan = statementSet.compilePlan();
String json = plan.asJsonString();
// Attempt to load the plan
// This fails with the error 'Could not resolve internal system
function '$UNNEST_ROWS$1'. This is a bug, please file an issue.'
CompiledPlan plan2 = tEnv.loadPlan(PlanReference.fromJsonString(json));
plan2.execute().print();
}
private DataType createNestedDatatype() {
return DataTypes.ROW(
DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.ROW(
DataTypes.FIELD("nested", DataTypes.STRING())
))),
DataTypes.FIELD("name", DataTypes.STRING()));
}