Ramin Gharib created FLINK-40039:
------------------------------------
Summary: CTAS/RTAS do not work for PTFs with set semantics and
PARTITION BY
Key: FLINK-40039
URL: https://issues.apache.org/jira/browse/FLINK-40039
Project: Flink
Issue Type: Bug
Components: Table SQL / API, Table SQL / Planner
Reporter: Ramin Gharib
{{CREATE TABLE ... AS SELECT}} and REPLACE TABLE ... AS over a set-semantic
process table function (a PTF whose table argument uses PARTITION BY) crashed
during planning with IllegalStateException: must call validate first.
h3. Steps to re-produce
{code:java}
CREATE TABLE sink_tbl
WITH ('connector' = 'values')
AS
SELECT * FROM f(r => TABLE t PARTITION BY name, i => 1); {code}
f beinig a PTF with set-semantics. This will ouput:
{code:java}
java.lang.IllegalStateException: must call validate first
at IdentifierNamespace.resolve(...)
at SqlToRelConverter.substituteSubQueryOfSetSemanticsInputTable(...)
...
at SqlCreateTableAsConverter.convertSqlNode(...) {code}
h3. Root Cause
Planning a query is a pipeline: parse → validate → convert (sql-to-rel). During
validate, Calcite attaches a "namespace" to each table. During convert, the
engine needs the namespace.
CTAS and RTAS validated and convert the AS-query {*}twice{*}:
# once to figure out the new table's columns, and
# again inside a helper ({{{}MergeTableAsUtil.maybeRewriteQuery{}}}) that
*rebuilt the query as new SQL and re-converted it* to reconcile columns with
the sink.
On that second pass the set-semantic input lost its namespace, so sql-to-rel
hit an un-namespaced table and threw {{{}must call validate first{}}}.
Plain {{INSERT and MT-AS}} validates once, so it was always fine.
h3. The Fix
{{maybeRewriteQuery}} reconciles the query's columns with the target table
(reorder columns, add NULL for columns the query doesn't produce). It used to
do this by rewriting the SQL text and re-validating:
{code:java}
// BEFORE: build a new SQL query and validate + convert it again <-- second
validation
SqlCall newSelect = rewriteCall(origQueryNode, ...);
return convert(newSelect); {code}
Now it reshapes the columns as a relational projection bolted on top of the
already-converted plan — no second validation:
{code:java}
// AFTER: add one projection over the plan we already built
// existing column -> RexInputRef; missing column -> NULL literal
RelNode projected = relBuilder.push(queryRelNode).project(projects, fieldNames,
true).build();
return new PlannerQueryOperation(projected, ...); {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)