Caizhi Weng created FLINK-25227: ----------------------------------- Summary: Comparing the equality of the same (boxed) numeric values returns false Key: FLINK-25227 URL: https://issues.apache.org/jira/browse/FLINK-25227 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.13.3, 1.12.5, 1.14.0 Reporter: Caizhi Weng Fix For: 1.15.0, 1.14.1, 1.13.4
Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug. {code:scala} @Test def myTest(): Unit = { val data = Seq( Row.of( java.lang.Integer.valueOf(1000), java.lang.Integer.valueOf(2000), java.lang.Integer.valueOf(1000), java.lang.Integer.valueOf(2000)) ) tEnv.executeSql( s""" |create table T ( | a int, | b int, | c int, | d int |) with ( | 'connector' = 'values', | 'bounded' = 'true', | 'data-id' = '${TestValuesTableFactory.registerData(data)}' |) |""".stripMargin) tEnv.executeSql("select greatest(a, b) = greatest(c, d) from T").print() } {code} The result is false, which is obviously incorrect. This is caused by the generated java code: {code:java} public class StreamExecCalc$8 extends org.apache.flink.table.runtime.operators.TableStreamOperator implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { private final Object[] references; org.apache.flink.table.data.BoxedWrapperRowData out = new org.apache.flink.table.data.BoxedWrapperRowData(1); private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); public StreamExecCalc$8( Object[] references, org.apache.flink.streaming.runtime.tasks.StreamTask task, org.apache.flink.streaming.api.graph.StreamConfig config, org.apache.flink.streaming.api.operators.Output output, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception { this.references = references; this.setup(task, config, output); if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) { ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) .setProcessingTimeService(processingTimeService); } } @Override public void open() throws Exception { super.open(); } @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue(); int field$0; boolean isNull$0; int field$1; boolean isNull$1; int field$3; boolean isNull$3; int field$4; boolean isNull$4; boolean isNull$6; boolean result$7; isNull$3 = in1.isNullAt(2); field$3 = -1; if (!isNull$3) { field$3 = in1.getInt(2); } isNull$0 = in1.isNullAt(0); field$0 = -1; if (!isNull$0) { field$0 = in1.getInt(0); } isNull$1 = in1.isNullAt(1); field$1 = -1; if (!isNull$1) { field$1 = in1.getInt(1); } isNull$4 = in1.isNullAt(3); field$4 = -1; if (!isNull$4) { field$4 = in1.getInt(3); } out.setRowKind(in1.getRowKind()); java.lang.Integer result$2 = field$0; boolean nullTerm$2 = false; if (!nullTerm$2) { java.lang.Integer cur$2 = field$0; if (isNull$0) { nullTerm$2 = true; } else { int compareResult = result$2.compareTo(cur$2); if ((true && compareResult < 0) || (compareResult > 0 && !true)) { result$2 = cur$2; } } } if (!nullTerm$2) { java.lang.Integer cur$2 = field$1; if (isNull$1) { nullTerm$2 = true; } else { int compareResult = result$2.compareTo(cur$2); if ((true && compareResult < 0) || (compareResult > 0 && !true)) { result$2 = cur$2; } } } if (nullTerm$2) { result$2 = null; } java.lang.Integer result$5 = field$3; boolean nullTerm$5 = false; if (!nullTerm$5) { java.lang.Integer cur$5 = field$3; if (isNull$3) { nullTerm$5 = true; } else { int compareResult = result$5.compareTo(cur$5); if ((true && compareResult < 0) || (compareResult > 0 && !true)) { result$5 = cur$5; } } } if (!nullTerm$5) { java.lang.Integer cur$5 = field$4; if (isNull$4) { nullTerm$5 = true; } else { int compareResult = result$5.compareTo(cur$5); if ((true && compareResult < 0) || (compareResult > 0 && !true)) { result$5 = cur$5; } } } if (nullTerm$5) { result$5 = null; } isNull$6 = nullTerm$2 || nullTerm$5; result$7 = false; if (!isNull$6) { result$7 = result$2 == result$5; } if (isNull$6) { out.setNullAt(0); } else { out.setBoolean(0, result$7); } output.collect(outElement.replace(out)); } @Override public void close() throws Exception { super.close(); } } {code} You can see that line 137 compares two boxed Integer types with {{==}} instead of {{.equals}}, which causes this problem. In older Flink versions where the return types of {{cast}} functions are also boxed types, casting strings to numeric values are also affected by this bug. Currently for a quick fix we can rewrite the generated code. But for a long term solution we shouldn't use boxed types as internal data structures. -- This message was sent by Atlassian Jira (v8.20.1#820001)