Dear Apache Flink community,

I could use some help with a serialization issue I'm having while using the
Table API. Specifically, I'm trying to deserialize a serialized
CompiledPlan, but I'm running into trouble with the UNNEST_ROWS operation.
It seems that the CompilePlan deserializer isn't looking up any functions
in the BuiltInFunctionDefinitions class, which is causing the
de-serialization to fail.

Do any of you have experience with this issue or know of a workaround for
serializing a Table API plan?

Below is code to replicate.

Thanks,
Daniel Henneberger

private void test() {
  EnvironmentSettings settings =
EnvironmentSettings.newInstance().inStreamingMode().build();
  TableEnvironment tEnv = TableEnvironment.create(settings);

  // Create a table of values
  Table table = tEnv.fromValues(createNestedDatatype(),
      Row.of(List.of(Row.of("nested")), "name"));
  tEnv.createTemporaryView("table1", table);

  // Invoke the unnest operation
  Table unnested = tEnv.sqlQuery("SELECT name, nested\n"
      + "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)");

  StatementSet statementSet = tEnv.createStatementSet();
  statementSet.addInsert(TableDescriptor.forConnector("print").build(),
unnested);

  // Serialize the plan
  CompiledPlan plan = statementSet.compilePlan();
  String json = plan.asJsonString();

  // Attempt to load the plan
  // This fails with the error 'Could not resolve internal system
function '$UNNEST_ROWS$1'. This is a bug, please file an issue.'
  CompiledPlan plan2 = tEnv.loadPlan(PlanReference.fromJsonString(json));
  plan2.execute().print();
}

private DataType createNestedDatatype() {
  return DataTypes.ROW(
      DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.ROW(
          DataTypes.FIELD("nested", DataTypes.STRING())
      ))),
      DataTypes.FIELD("name", DataTypes.STRING()));
}

Reply via email to