Hi, community

I encountered a scala.matchError when trying to obtain the table plan for
the following query in Flink 1.18.1.

The input data is read from Kafka, and the query is intended to perform a
typical WordCount operation. The query is as follows. SPLIT_STRING is a
Table Function UDF that splits sentences into words by spaces.

```SELECT
    window_start,
    word,
    COUNT(*) AS `count`
FROM
    TABLE(
      TUMBLE(TABLE input_table, DESCRIPTOR(proctime), INTERVAL '10' SECOND)),
      LATERAL TABLE(SPLIT_STRING(sentence)) AS T(word)
GROUP BY
    window_start,
    window_end,
    word```

The error message received is:

```
[ERR] scala.MatchError:
rel#177237:FlinkLogicalCorrelate.LOGICAL.any.None:
0.[NONE].[NONE](left=RelSubset#177235,right=RelSubset#177236,correlation=$cor0,joinType=inner,requiredColumns={1})
(of class 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCorrelate)
```

I believe that the issue lies in the existNeighbourWindowTableFunc method
in flink-table-planner/WindowUtil.scala, where there is an unconsidered
node (FlinkLogicalCorrelate) when traversing the AST. (This method was
added in FLINK-32578.) I suspect this comes from the LATERAL entry. While
this query was FlinkLogicalCorrelate, I think there might be other
unconsidered nodes as well.

I have two questions regarding this:

   1. Is it an expected behavior for scala.matchError to occur in this
   case? In other words, I suspect this might be an unreported bug.
   2. In the code comments of the PR mentioned in the FLINK-32578 ticket, I
   found the terms "standard form" and "relax form." I searched for "relax
   form" in the Flink documentation but could not find any reference. As a
   workaround for this issue, using the WITH clause could be considered, but I
   am uncertain if this is a universal solution.

Thank you for your assistance.

Reply via email to