[ 
https://issues.apache.org/jira/browse/FLINK-38624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18037328#comment-18037328
 ] 

EMERSON WANG edited comment on FLINK-38624 at 11/12/25 5:24 PM:
----------------------------------------------------------------

The original PyFlink scripts in our environment are complex, involving joins 
with Kafka streams and running within a GKE cluster. For this example, we have 
simplified the table & view definitions and the PyFlink script to reproduce the 
errors using datagen and csv file which was running in the GCP Dataproc cluster.

Please find the attached PyFlink script (`flink_example.py`) and the data file 
(`test_data.csv`), which we placed in the GCS bucket:
gs://tmp-test-dp-us-gsn-netdata-dev

Execution Environment:
 - *{*}Platform{*}*: GCP Dataproc
 - *{*}Flink Version{*}*: 2.1.0
 - *{*}Download URL{*}*: 
[https://archive.apache.org/dist/flink/flink-2.1.0/flink-2.1.0-bin-scala_2.12.tgz]

We successfully reproduced the same error encountered in GKE with the following 
CLI command:

ewang@dev-cluster-m:~$ flink run -m yarn-cluster -yid 
application_1762454865525_0001 -py ./flink_example.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/usr/lib/flink-2.1.0/lib/log4j-slf4j-impl-2.24.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See [http://www.slf4j.org/codes.html#multiple_bindings] for an 
explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2025-11-10 23:57:33,316 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - 
Found Yarn properties file under /tmp/.yarn-properties-ewang.
2025-11-10 23:57:33,316 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - 
Found Yarn properties file under /tmp/.yarn-properties-ewang.
Traceback (most recent call last):
File "/home/ewang/./flink_example.py", line 487, in <module>
python_test()
File "/home/ewang/./flink_example.py", line 481, in python_test
table_env.execute_sql(ut_cid_printsink_sql)
File 
"/usr/lib/flink-2.1.0/opt/python/pyflink.zip/pyflink/table/table_environment.py",
 line 840, in execute_sql
File 
"/usr/lib/flink-2.1.0/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", 
line 1322, in _{_}call{_}_
File "/usr/lib/flink-2.1.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", 
line 162, in deco
File "/usr/lib/flink-2.1.0/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", 
line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o8.executeSql.
: java.lang.RuntimeException: Error while applying rule 
StreamPhysicalOverAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
[rel#697:FlinkLogicalOverAggregate.LOGICAL.any.None: 
0.[NONE].[NONE].[NONE](input=RelSubset#694,window#0=window(partition \{3, 28} 
order by [62 ASC-nulls-first] aggs [LAG($18), LAG($19), LAG($41), LAG($42), 
LAG($43), LAG($44), LAG($45), LAG($46), LAG($47), LAG($48), LAG($49), LAG($50), 
LAG($51), LAG($56), LAG($57), LAG($61)]))]
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:208)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:123)
at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:395)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1434)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1012)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1242)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:873)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Error occurred while applying rule 
StreamPhysicalOverAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:273)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:288)
at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:174)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223)
... 38 more
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(DOUBLE fl_bits_per_sym, INTEGER carrierd_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" macaddr, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" bsid, INTEGER beam_id, INTEGER logical_beam_id, 
INTEGER satellite_id, BIGINT databus_ts, VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" serving_smac_host_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
smac_service_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
smac_source_name, INTEGER smac_domain_id, INTEGER access_network_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" nsp_id, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" vno_id, BOOLEAN in_vno, DOUBLE fl_sinr, DOUBLE 
rl_sinr, BIGINT fl_byte_count, BIGINT rl_byte_count, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" online_time, BIGINT online_timestamp, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ho_from_bsid, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" state, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
fl_mod_code_pt, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rl_mod_code_pt, 
DOUBLE pa_compression, INTEGER terminal_type, BIGINT cid, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" type, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
sched_type, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" cls_type, INTEGER 
traffic_priority, INTEGER request_transmission_policy, BIGINT 
max_sustained_traffic_rate, BIGINT min_reserved_traffic_rate, INTEGER 
max_latency, INTEGER tolerated_jitter, INTEGER unsolicited_grant_interval, 
INTEGER svlan_id, BIGINT traffic_weight, BIGINT fl_pdu_count, BIGINT 
rl_pdu_count, BIGINT sdu_count, BIGINT byte_count, BIGINT mrtr_byte_count, 
BIGINT wfs_byte_count, BIGINT rl_burst_count, BIGINT rl_burst_err_count, BIGINT 
rl_expired_grant_count, BIGINT rl_requested_bytes, BIGINT rl_bytes_granted, 
INTEGER flq_num_samples, BIGINT flq_sum_queued_bytes, BIGINT 
flq_max_queued_bytes, BIGINT flq_all_time_max_queued_bytes, BIGINT 
fl_dropped_sdu_count, BIGINT fl_marked_sdu_count, INTEGER sdfid, INTEGER pdfid, 
BIGINT corr_id, TIMESTAMP(3) record_ts, TIMESTAMP(3) *ROWTIME* event_ts, 
INTEGER ts_year, INTEGER ts_month, INTEGER ts_day, INTEGER ts_hour, INTEGER 
ts_minute, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" dt, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ts_hr, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
ts_min, INTEGER id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" mod_code_pt, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" waveform, DOUBLE snr, DOUBLE 
bits_per_sym, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" direction, INTEGER 
spread_factor, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" record_ts0, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" dt0, INTEGER ts_year0, INTEGER 
ts_month0, INTEGER ts_day0, INTEGER ts_hr0, TIMESTAMP(3) mod_record_ts, BIGINT 
w0$o0, BIGINT w0$o1, BIGINT w0$o2, BIGINT w0$o3, BIGINT w0$o4, BIGINT w0$o5, 
BIGINT w0$o6, BIGINT w0$o7, BIGINT w0$o8, BIGINT w0$o9, BIGINT w0$o10, BIGINT 
w0$o11, BIGINT w0$o12, BIGINT w0$o13, BIGINT w0$o14, TIMESTAMP(3) w0$o15) NOT 
NULL
equiv rowtype: RecordType(DOUBLE fl_bits_per_sym, INTEGER carrierd_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" macaddr, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" bsid, INTEGER beam_id, INTEGER logical_beam_id, 
INTEGER satellite_id, BIGINT databus_ts, VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" serving_smac_host_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
smac_service_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
smac_source_name, INTEGER smac_domain_id, INTEGER access_network_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" nsp_id, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" vno_id, BOOLEAN in_vno, DOUBLE fl_sinr, DOUBLE 
rl_sinr, BIGINT fl_byte_count, BIGINT rl_byte_count, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" online_time, BIGINT online_timestamp, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ho_from_bsid, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" state, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
fl_mod_code_pt, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rl_mod_code_pt, 
DOUBLE pa_compression, INTEGER terminal_type, BIGINT cid, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" type, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
sched_type, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" cls_type, INTEGER 
traffic_priority, INTEGER request_transmission_policy, BIGINT 
max_sustained_traffic_rate, BIGINT min_reserved_traffic_rate, INTEGER 
max_latency, INTEGER tolerated_jitter, INTEGER unsolicited_grant_interval, 
INTEGER svlan_id, BIGINT traffic_weight, BIGINT fl_pdu_count, BIGINT 
rl_pdu_count, BIGINT sdu_count, BIGINT byte_count, BIGINT mrtr_byte_count, 
BIGINT wfs_byte_count, BIGINT rl_burst_count, BIGINT rl_burst_err_count, BIGINT 
rl_expired_grant_count, BIGINT rl_requested_bytes, BIGINT rl_bytes_granted, 
INTEGER flq_num_samples, BIGINT flq_sum_queued_bytes, BIGINT 
flq_max_queued_bytes, BIGINT flq_all_time_max_queued_bytes, BIGINT 
fl_dropped_sdu_count, BIGINT fl_marked_sdu_count, INTEGER sdfid, INTEGER pdfid, 
BIGINT corr_id, TIMESTAMP(3) record_ts, TIMESTAMP(3) *ROWTIME* event_ts, 
INTEGER ts_year, INTEGER ts_month, INTEGER ts_day, INTEGER ts_hour, INTEGER 
ts_minute, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" dt, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ts_hr, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
ts_min, INTEGER id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" mod_code_pt, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" waveform, DOUBLE snr, DOUBLE 
bits_per_sym, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" direction, INTEGER 
spread_factor, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" record_ts0, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" dt0, INTEGER ts_year0, INTEGER 
ts_month0, INTEGER ts_day0, INTEGER ts_hr0, TIMESTAMP(3) *ROWTIME* 
mod_record_ts, BIGINT w0$o0, BIGINT w0$o1, BIGINT w0$o2, BIGINT w0$o3, BIGINT 
w0$o4, BIGINT w0$o5, BIGINT w0$o6, BIGINT w0$o7, BIGINT w0$o8, BIGINT w0$o9, 
BIGINT w0$o10, BIGINT w0$o11, BIGINT w0$o12, BIGINT w0$o13, BIGINT w0$o14, 
TIMESTAMP(3) w0$o15) NOT NULL
Difference:
mod_record_ts: TIMESTAMP(3) -> TIMESTAMP(3) *ROWTIME*

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 42 more

org.apache.flink.client.program.ProgramAbortException: 
java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:138)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1017)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:230)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1261)
at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$11(CliFrontend.java:1355)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1953)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1355)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1323)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:128)
... 17 more

Interestingly, we found that removing the following 15 fields from the sink 
print_table (along with their associated output values) eliminates the error:
fl_dropped_sdu_count BIGINT,
fl_dropped_sdu_count_delta BIGINT,
fl_marked_sdu_count BIGINT,
fl_marked_sdu_count_delta BIGINT,
event_ts TIMESTAMP(3),
record_ts TIMESTAMP(3),
delta_record_ts INT,
ts_year INT,
ts_month INT,
ts_day INT,
ts_hour INT,
ts_minute INT,
dt STRING,
ts_hr STRING,
ts_min STRING


was (Author: JIRAUSER305679):
The original PyFlink scripts in our environment are complex, involving joins 
with Kafka streams and running within a GKE cluster. For this example, we have 
simplified the table & view definitions and the PyFlink script to reproduce the 
errors using datagen and csv file.

Please find the attached PyFlink script (`flink_example.py`) and the data file 
(`test_data.csv`), which we placed in the GCS bucket:
gs://tmp-test-dp-us-gsn-netdata-dev

Execution Environment:
- **Platform**: GCP Dataflow
- **Flink Version**: 2.1.0
- **Download URL**: 
https://archive.apache.org/dist/flink/flink-2.1.0/flink-2.1.0-bin-scala_2.12.tgz

We successfully reproduced the same error encountered in GKE with the following 
CLI command:

ewang@dev-cluster-m:~$ flink run -m yarn-cluster -yid 
application_1762454865525_0001 -py ./flink_example.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/usr/lib/flink-2.1.0/lib/log4j-slf4j-impl-2.24.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/lib/hadoop/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2025-11-10 23:57:33,316 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - 
Found Yarn properties file under /tmp/.yarn-properties-ewang.
2025-11-10 23:57:33,316 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - 
Found Yarn properties file under /tmp/.yarn-properties-ewang.
Traceback (most recent call last):
File "/home/ewang/./flink_example.py", line 487, in <module>
python_test()
File "/home/ewang/./flink_example.py", line 481, in python_test
table_env.execute_sql(ut_cid_printsink_sql)
File 
"/usr/lib/flink-2.1.0/opt/python/pyflink.zip/pyflink/table/table_environment.py",
 line 840, in execute_sql
File 
"/usr/lib/flink-2.1.0/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", 
line 1322, in __call__
File "/usr/lib/flink-2.1.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", 
line 162, in deco
File "/usr/lib/flink-2.1.0/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", 
line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o8.executeSql.
: java.lang.RuntimeException: Error while applying rule 
StreamPhysicalOverAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL), args 
[rel#697:FlinkLogicalOverAggregate.LOGICAL.any.None: 
0.[NONE].[NONE].[NONE](input=RelSubset#694,window#0=window(partition \{3, 28} 
order by [62 ASC-nulls-first] aggs [LAG($18), LAG($19), LAG($41), LAG($42), 
LAG($43), LAG($44), LAG($45), LAG($46), LAG($47), LAG($48), LAG($49), LAG($50), 
LAG($51), LAG($56), LAG($57), LAG($61)]))]
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:317)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:208)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:123)
at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:395)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1434)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1012)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1242)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:873)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Error occurred while applying rule 
StreamPhysicalOverAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:273)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:288)
at org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:174)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223)
... 38 more
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(DOUBLE fl_bits_per_sym, INTEGER carrierd_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" macaddr, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" bsid, INTEGER beam_id, INTEGER logical_beam_id, 
INTEGER satellite_id, BIGINT databus_ts, VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" serving_smac_host_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
smac_service_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
smac_source_name, INTEGER smac_domain_id, INTEGER access_network_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" nsp_id, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" vno_id, BOOLEAN in_vno, DOUBLE fl_sinr, DOUBLE 
rl_sinr, BIGINT fl_byte_count, BIGINT rl_byte_count, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" online_time, BIGINT online_timestamp, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ho_from_bsid, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" state, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
fl_mod_code_pt, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rl_mod_code_pt, 
DOUBLE pa_compression, INTEGER terminal_type, BIGINT cid, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" type, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
sched_type, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" cls_type, INTEGER 
traffic_priority, INTEGER request_transmission_policy, BIGINT 
max_sustained_traffic_rate, BIGINT min_reserved_traffic_rate, INTEGER 
max_latency, INTEGER tolerated_jitter, INTEGER unsolicited_grant_interval, 
INTEGER svlan_id, BIGINT traffic_weight, BIGINT fl_pdu_count, BIGINT 
rl_pdu_count, BIGINT sdu_count, BIGINT byte_count, BIGINT mrtr_byte_count, 
BIGINT wfs_byte_count, BIGINT rl_burst_count, BIGINT rl_burst_err_count, BIGINT 
rl_expired_grant_count, BIGINT rl_requested_bytes, BIGINT rl_bytes_granted, 
INTEGER flq_num_samples, BIGINT flq_sum_queued_bytes, BIGINT 
flq_max_queued_bytes, BIGINT flq_all_time_max_queued_bytes, BIGINT 
fl_dropped_sdu_count, BIGINT fl_marked_sdu_count, INTEGER sdfid, INTEGER pdfid, 
BIGINT corr_id, TIMESTAMP(3) record_ts, TIMESTAMP(3) *ROWTIME* event_ts, 
INTEGER ts_year, INTEGER ts_month, INTEGER ts_day, INTEGER ts_hour, INTEGER 
ts_minute, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" dt, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ts_hr, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
ts_min, INTEGER id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" mod_code_pt, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" waveform, DOUBLE snr, DOUBLE 
bits_per_sym, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" direction, INTEGER 
spread_factor, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" record_ts0, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" dt0, INTEGER ts_year0, INTEGER 
ts_month0, INTEGER ts_day0, INTEGER ts_hr0, TIMESTAMP(3) mod_record_ts, BIGINT 
w0$o0, BIGINT w0$o1, BIGINT w0$o2, BIGINT w0$o3, BIGINT w0$o4, BIGINT w0$o5, 
BIGINT w0$o6, BIGINT w0$o7, BIGINT w0$o8, BIGINT w0$o9, BIGINT w0$o10, BIGINT 
w0$o11, BIGINT w0$o12, BIGINT w0$o13, BIGINT w0$o14, TIMESTAMP(3) w0$o15) NOT 
NULL
equiv rowtype: RecordType(DOUBLE fl_bits_per_sym, INTEGER carrierd_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" macaddr, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" bsid, INTEGER beam_id, INTEGER logical_beam_id, 
INTEGER satellite_id, BIGINT databus_ts, VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" serving_smac_host_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
smac_service_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
smac_source_name, INTEGER smac_domain_id, INTEGER access_network_id, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" nsp_id, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" vno_id, BOOLEAN in_vno, DOUBLE fl_sinr, DOUBLE 
rl_sinr, BIGINT fl_byte_count, BIGINT rl_byte_count, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" online_time, BIGINT online_timestamp, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" ho_from_bsid, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" state, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
fl_mod_code_pt, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rl_mod_code_pt, 
DOUBLE pa_compression, INTEGER terminal_type, BIGINT cid, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" type, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
sched_type, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" cls_type, INTEGER 
traffic_priority, INTEGER request_transmission_policy, BIGINT 
max_sustained_traffic_rate, BIGINT min_reserved_traffic_rate, INTEGER 
max_latency, INTEGER tolerated_jitter, INTEGER unsolicited_grant_interval, 
INTEGER svlan_id, BIGINT traffic_weight, BIGINT fl_pdu_count, BIGINT 
rl_pdu_count, BIGINT sdu_count, BIGINT byte_count, BIGINT mrtr_byte_count, 
BIGINT wfs_byte_count, BIGINT rl_burst_count, BIGINT rl_burst_err_count, BIGINT 
rl_expired_grant_count, BIGINT rl_requested_bytes, BIGINT rl_bytes_granted, 
INTEGER flq_num_samples, BIGINT flq_sum_queued_bytes, BIGINT 
flq_max_queued_bytes, BIGINT flq_all_time_max_queued_bytes, BIGINT 
fl_dropped_sdu_count, BIGINT fl_marked_sdu_count, INTEGER sdfid, INTEGER pdfid, 
BIGINT corr_id, TIMESTAMP(3) record_ts, TIMESTAMP(3) *ROWTIME* event_ts, 
INTEGER ts_year, INTEGER ts_month, INTEGER ts_day, INTEGER ts_hour, INTEGER 
ts_minute, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" dt, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" ts_hr, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
ts_min, INTEGER id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" mod_code_pt, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" waveform, DOUBLE snr, DOUBLE 
bits_per_sym, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" direction, INTEGER 
spread_factor, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" record_ts0, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" dt0, INTEGER ts_year0, INTEGER 
ts_month0, INTEGER ts_day0, INTEGER ts_hr0, TIMESTAMP(3) *ROWTIME* 
mod_record_ts, BIGINT w0$o0, BIGINT w0$o1, BIGINT w0$o2, BIGINT w0$o3, BIGINT 
w0$o4, BIGINT w0$o5, BIGINT w0$o6, BIGINT w0$o7, BIGINT w0$o8, BIGINT w0$o9, 
BIGINT w0$o10, BIGINT w0$o11, BIGINT w0$o12, BIGINT w0$o13, BIGINT w0$o14, 
TIMESTAMP(3) w0$o15) NOT NULL
Difference:
mod_record_ts: TIMESTAMP(3) -> TIMESTAMP(3) *ROWTIME*

at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 42 more

org.apache.flink.client.program.ProgramAbortException: 
java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:138)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:343)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1017)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:230)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1261)
at 
org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$11(CliFrontend.java:1355)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1953)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1355)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1323)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:128)
... 17 more



Interestingly, we found that removing the following 15 fields from the sink 
print_table (along with their associated output values) eliminates the error:
fl_dropped_sdu_count BIGINT,
fl_dropped_sdu_count_delta BIGINT,
fl_marked_sdu_count BIGINT,
fl_marked_sdu_count_delta BIGINT,
event_ts TIMESTAMP(3),
record_ts TIMESTAMP(3),
delta_record_ts INT,
ts_year INT,
ts_month INT,
ts_day INT,
ts_hour INT,
ts_minute INT,
dt STRING,
ts_hr STRING,
ts_min STRING

> Flink 2.1 Job Failure: Type Mismatch Exception in 
> StreamPhysicalOverAggregateRule (TIMESTAMP(3) vs TIMESTAMP(3) ROWTIME)
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38624
>                 URL: https://issues.apache.org/jira/browse/FLINK-38624
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.1.0
>            Reporter: EMERSON WANG
>            Priority: Major
>         Attachments: flink_example.py, test_data.csv
>
>
> Our PyFlink Table API jobs run successfully in Google Kubernetes Engine (GKE) 
> with Flink 1.19.0 and 1.20.0, but fail with Flink 2.1.0. The log shows the 
> following error:
> java.lang.RuntimeException: Error while applying rule 
> StreamPhysicalOverAggregateRule(in:LOGICAL,out:STREAM_PHYSICAL)
> Caused by: java.lang.IllegalArgumentException: Type mismatch:
> rel rowtype: ... mod_record_ts: TIMESTAMP(3) -> TIMESTAMP(3) *ROWTIME* ...
> The job fails during execution of a statement set, with a type mismatch 
> involving the mod_record_ts field. The error appears to be related to the 
> planner's handling of time attributes in Table SQL jobs.
> The table field mod_record_ts was used only for watermark in the following 
> table:
> CREATE TABLE mod_code_pt (
> id INT,
> mod_code_pt STRING,
> waveform STRING,
> snr double,
> bits_per_sym double,
> direction STRING,
> spread_factor INT,
> record_ts STRING,
> dt STRING,
> ts_year INT,
> ts_month INT,
> ts_day INT,
> ts_hr INT,
> mod_record_ts as TO_TIMESTAMP(record_ts),
> PRIMARY KEY (mod_code_pt) NOT ENFORCED,
> WATERMARK FOR mod_record_ts AS mod_record_ts - INTERVAL '60' SECONDS
> ) WITH (
> 'connector'='filesystem',
> 'path'='gs://<path>',
> 'format'='parquet',
> 'source.monitor-interval'='1 d'
> )
> Steps to Reproduce:
> Deploy PyFlink Table API job on GKE with Flink 2.1.0
> Use Table definitions similar to those in the log (see CREATE TABLE 
> statements).
> Run the job; observe failure with type mismatch in planner.
> Expected Behavior:
> Job should execute successfully as in Flink 1.19.0/1.20.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to