[ 
https://issues.apache.org/jira/browse/FLINK-38624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18058932#comment-18058932
 ] 

EMERSON WANG commented on FLINK-38624:
--------------------------------------

[~Sergey Nuyanzin]  Thanks for the quick response and for the fix! I've 
reviewed the PR changes, and the new inferOutputRowType logic in 
StreamPhysicalOverAggregateRule correctly addresses the root cause by ensuring 
the physical row types (materialized time indicators) are used rather than 
logical ones. I also see that the new test case testTemporalJoinWithWatermarks 
mirrors our specific job failure scenario perfectly. Based on the code and 
tests, this looks like a solid fix for the regression.

> Flink 2.1 Job Failure: Type Mismatch Exception in 
> StreamPhysicalOverAggregateRule (TIMESTAMP(3) vs TIMESTAMP(3) ROWTIME)
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38624
>                 URL: https://issues.apache.org/jira/browse/FLINK-38624
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.1.0
>            Reporter: EMERSON WANG
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: flink_example.py
>
>
> Our PyFlink Table API jobs run successfully in Google Kubernetes Engine (GKE) 
> with Flink 1.19.0 and 1.20.0, but fail with Flink 2.1.0. The log shows the 
> following error:
> {noformat}
> java.lang.RuntimeException: Error while applying rule 
> StreamPhysicalOverAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL)
> Caused by: java.lang.IllegalArgumentException: Type mismatch:
> rel rowtype: ... mod_record_ts: TIMESTAMP(3) -> TIMESTAMP(3) *ROWTIME* ...
> {noformat}
> The job fails during execution of a statement set, with a type mismatch 
> involving the mod_record_ts field. The error appears to be related to the 
> planner's handling of time attributes in Table SQL jobs.
> The table field mod_record_ts was used only for watermark in the following 
> table:
> {code:sql}
> CREATE TABLE mod_code_pt (
> id INT,
> mod_code_pt STRING,
> waveform STRING,
> snr double,
> bits_per_sym double,
> direction STRING,
> spread_factor INT,
> record_ts STRING,
> dt STRING,
> ts_year INT,
> ts_month INT,
> ts_day INT,
> ts_hr INT,
> mod_record_ts as TO_TIMESTAMP(record_ts),
> PRIMARY KEY (mod_code_pt) NOT ENFORCED,
> WATERMARK FOR mod_record_ts AS mod_record_ts - INTERVAL '60' SECONDS
> ) WITH (
> 'connector'='filesystem',
> 'path'='gs://<path>',
> 'format'='parquet',
> 'source.monitor-interval'='1 d'
> )
> {code}
> Steps to Reproduce:
> Deploy PyFlink Table API job on GKE with Flink 2.1.0
> Use Table definitions similar to those in the log (see CREATE TABLE 
> statements).
> Run the job; observe failure with type mismatch in planner.
> Expected Behavior:
> Job should execute successfully as in Flink 1.19.0/1.20.0
> Updated:
> junit version to reproduce
> {code:scala}
>  @Test
>   def testTemporalJoinWithWatermarks(): Unit = {
>     util.addTable(s"""
>                      |CREATE TABLE orders (
>                      |  product_id STRING,
>                      |  amount BIGINT,
>                      |  order_ts TIMESTAMP(3),
>                      |  WATERMARK FOR order_ts AS order_ts - INTERVAL '5' 
> SECONDS
>                      |) WITH (
>                      |  'connector' = 'values'
>                      |)
>                      |""".stripMargin)
>     util.addTable(s"""
>                      |CREATE TABLE products (
>                      |  product_id STRING,
>                      |  record_ts STRING,
>                      |  mod_record_ts AS TO_TIMESTAMP(record_ts),
>                      |  PRIMARY KEY (product_id) NOT ENFORCED,
>                      |  WATERMARK FOR mod_record_ts AS mod_record_ts - 
> INTERVAL '60' SECONDS
>                      |) WITH (
>                      |  'connector' = 'values'
>                      |)
>                      |""".stripMargin)
>     util.verifyExecPlan(s"""
>                            |SELECT count(o.amount) OVER (PARTITION BY 
> o.product_id) AS prev_amount
>                            |FROM orders AS o
>                            |LEFT JOIN products FOR SYSTEM_TIME AS OF 
> o.order_ts AS p
>                            |ON o.product_id = p.product_id
>                            |""".stripMargin)
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to