[ https://issues.apache.org/jira/browse/FLINK-25227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Konstantin Knauf updated FLINK-25227: ------------------------------------- Fix Version/s: (was: 1.14.4) > Comparing the equality of the same (boxed) numeric values returns false > ----------------------------------------------------------------------- > > Key: FLINK-25227 > URL: https://issues.apache.org/jira/browse/FLINK-25227 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.14.0, 1.12.5, 1.13.3 > Reporter: Caizhi Weng > Assignee: Caizhi Weng > Priority: Critical > Labels: pull-request-available > Fix For: 1.13.7, 1.14.5 > > > Add the following test case to {{TableEnvironmentITCase}} to reproduce this > bug. > {code:scala} > @Test > def myTest(): Unit = { > val data = Seq( > Row.of( > java.lang.Integer.valueOf(1000), > java.lang.Integer.valueOf(2000), > java.lang.Integer.valueOf(1000), > java.lang.Integer.valueOf(2000)) > ) > tEnv.executeSql( > s""" > |create table T ( > | a int, > | b int, > | c int, > | d int > |) with ( > | 'connector' = 'values', > | 'bounded' = 'true', > | 'data-id' = '${TestValuesTableFactory.registerData(data)}' > |) > |""".stripMargin) > tEnv.executeSql("select greatest(a, b) = greatest(c, d) from T").print() > } > {code} > The result is false, which is obviously incorrect. > This is caused by the generated java code: > {code:java} > public class StreamExecCalc$8 extends > org.apache.flink.table.runtime.operators.TableStreamOperator > implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > private final Object[] references; > org.apache.flink.table.data.BoxedWrapperRowData out = > new org.apache.flink.table.data.BoxedWrapperRowData(1); > private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = > new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > public StreamExecCalc$8( > Object[] references, > org.apache.flink.streaming.runtime.tasks.StreamTask task, > org.apache.flink.streaming.api.graph.StreamConfig config, > org.apache.flink.streaming.api.operators.Output output, > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) > throws Exception { > this.references = references; > this.setup(task, config, output); > if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > .setProcessingTimeService(processingTimeService); > } > } > @Override > public void open() throws Exception { > super.open(); > } > @Override > public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) > throws Exception { > org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) element.getValue(); > int field$0; > boolean isNull$0; > int field$1; > boolean isNull$1; > int field$3; > boolean isNull$3; > int field$4; > boolean isNull$4; > boolean isNull$6; > boolean result$7; > isNull$3 = in1.isNullAt(2); > field$3 = -1; > if (!isNull$3) { > field$3 = in1.getInt(2); > } > isNull$0 = in1.isNullAt(0); > field$0 = -1; > if (!isNull$0) { > field$0 = in1.getInt(0); > } > isNull$1 = in1.isNullAt(1); > field$1 = -1; > if (!isNull$1) { > field$1 = in1.getInt(1); > } > isNull$4 = in1.isNullAt(3); > field$4 = -1; > if (!isNull$4) { > field$4 = in1.getInt(3); > } > out.setRowKind(in1.getRowKind()); > java.lang.Integer result$2 = field$0; > boolean nullTerm$2 = false; > if (!nullTerm$2) { > java.lang.Integer cur$2 = field$0; > if (isNull$0) { > nullTerm$2 = true; > } else { > int compareResult = result$2.compareTo(cur$2); > if ((true && compareResult < 0) || (compareResult > 0 && > !true)) { > result$2 = cur$2; > } > } > } > if (!nullTerm$2) { > java.lang.Integer cur$2 = field$1; > if (isNull$1) { > nullTerm$2 = true; > } else { > int compareResult = result$2.compareTo(cur$2); > if ((true && compareResult < 0) || (compareResult > 0 && > !true)) { > result$2 = cur$2; > } > } > } > if (nullTerm$2) { > result$2 = null; > } > java.lang.Integer result$5 = field$3; > boolean nullTerm$5 = false; > if (!nullTerm$5) { > java.lang.Integer cur$5 = field$3; > if (isNull$3) { > nullTerm$5 = true; > } else { > int compareResult = result$5.compareTo(cur$5); > if ((true && compareResult < 0) || (compareResult > 0 && > !true)) { > result$5 = cur$5; > } > } > } > if (!nullTerm$5) { > java.lang.Integer cur$5 = field$4; > if (isNull$4) { > nullTerm$5 = true; > } else { > int compareResult = result$5.compareTo(cur$5); > if ((true && compareResult < 0) || (compareResult > 0 && > !true)) { > result$5 = cur$5; > } > } > } > if (nullTerm$5) { > result$5 = null; > } > isNull$6 = nullTerm$2 || nullTerm$5; > result$7 = false; > if (!isNull$6) { > result$7 = result$2 == result$5; > } > if (isNull$6) { > out.setNullAt(0); > } else { > out.setBoolean(0, result$7); > } > output.collect(outElement.replace(out)); > } > @Override > public void close() throws Exception { > super.close(); > } > } > {code} > You can see that line 137 compares two boxed Integer types with {{==}} > instead of {{.equals}}, which causes this problem. > In older Flink versions where the return types of {{cast}} functions are also > boxed types, casting strings to numeric values are also affected by this bug. > Currently for a quick fix we can rewrite the generated code. But for a long > term solution we shouldn't use boxed types as internal data structures. -- This message was sent by Atlassian Jira (v8.20.1#820001)