Hi Daniel,

Thanks for reporting this issue. According to the FLIP [1], this should be
a bug, and I've created a Jira ticket [2] to track this.

> We will introduce a declarative concept to `BuiltInFunctionDefinitions`
> and `FlinkSqlOperatorTable` that maintain a function name + version to
> instance mapping.
>

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489#FLIP190:SupportVersionUpgradesforTableAPI&SQLPrograms-FunctionVersioning
[2] https://issues.apache.org/jira/browse/FLINK-31182

Best,
Jane

On Wed, Feb 22, 2023 at 9:55 AM yuxia <luoyu...@alumni.sjtu.edu.cn> wrote:

> Hi, Daniel Henneberger.
> Thanks for reporting.  It seems a bug to me. Could you please help create
> a Jira[1] for it?
> As a workaround, is it possible not to use UNNEST? May be you can try to
> use EXPLODE function for the Flink planner will rewrites UNNEST to explode
> function
> in implementation[2].
>
> [1] https://issues.apache.org/jira/projects/FLINK/issues/
> [2]
> https://github.com/apache/flink/blob/bf342d2f67a46e5266c3595734574db270f1b48c/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.scala
>
> Best regards,
> Yuxia
>
> ------------------------------
> *发件人: *"Daniel Henneberger" <henneberger.dan...@gmail.com>
> *收件人: *"User" <user@flink.apache.org>
> *发送时间: *星期三, 2023年 2 月 22日 上午 5:35:02
> *主题: *Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API
>
> Dear Apache Flink community,
>
> I could use some help with a serialization issue I'm having while using
> the Table API. Specifically, I'm trying to deserialize a serialized
> CompiledPlan, but I'm running into trouble with the UNNEST_ROWS operation.
> It seems that the CompilePlan deserializer isn't looking up any functions
> in the BuiltInFunctionDefinitions class, which is causing the
> de-serialization to fail.
>
> Do any of you have experience with this issue or know of a workaround for
> serializing a Table API plan?
>
> Below is code to replicate.
>
> Thanks,
> Daniel Henneberger
>
> private void test() {
>   EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inStreamingMode().build();
>   TableEnvironment tEnv = TableEnvironment.create(settings);
>
>   // Create a table of values
>   Table table = tEnv.fromValues(createNestedDatatype(),
>       Row.of(List.of(Row.of("nested")), "name"));
>   tEnv.createTemporaryView("table1", table);
>
>   // Invoke the unnest operation
>   Table unnested = tEnv.sqlQuery("SELECT name, nested\n"
>       + "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)");
>
>   StatementSet statementSet = tEnv.createStatementSet();
>   statementSet.addInsert(TableDescriptor.forConnector("print").build(), 
> unnested);
>
>   // Serialize the plan
>   CompiledPlan plan = statementSet.compilePlan();
>   String json = plan.asJsonString();
>
>   // Attempt to load the plan
>   // This fails with the error 'Could not resolve internal system function 
> '$UNNEST_ROWS$1'. This is a bug, please file an issue.'
>   CompiledPlan plan2 = tEnv.loadPlan(PlanReference.fromJsonString(json));
>   plan2.execute().print();
> }
>
> private DataType createNestedDatatype() {
>   return DataTypes.ROW(
>       DataTypes.FIELD("arr", DataTypes.ARRAY(DataTypes.ROW(
>           DataTypes.FIELD("nested", DataTypes.STRING())
>       ))),
>       DataTypes.FIELD("name", DataTypes.STRING()));
> }
>
>
>

Reply via email to