Caizhi Weng created FLINK-25227:
-----------------------------------

             Summary: Comparing the equality of the same (boxed) numeric values 
returns false
                 Key: FLINK-25227
                 URL: https://issues.apache.org/jira/browse/FLINK-25227
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.13.3, 1.12.5, 1.14.0
            Reporter: Caizhi Weng
             Fix For: 1.15.0, 1.14.1, 1.13.4


Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug.

{code:scala}
@Test
def myTest(): Unit = {
  val data = Seq(
    Row.of(
      java.lang.Integer.valueOf(1000),
      java.lang.Integer.valueOf(2000),
      java.lang.Integer.valueOf(1000),
      java.lang.Integer.valueOf(2000))
  )

  tEnv.executeSql(
    s"""
       |create table T (
       |  a int,
       |  b int,
       |  c int,
       |  d int
       |) with (
       |  'connector' = 'values',
       |  'bounded' = 'true',
       |  'data-id' = '${TestValuesTableFactory.registerData(data)}'
       |)
       |""".stripMargin)

  tEnv.executeSql("select greatest(a, b) = greatest(c, d) from T").print()
}
{code}

The result is false, which is obviously incorrect.

This is caused by the generated java code:
{code:java}
public class StreamExecCalc$8 extends 
org.apache.flink.table.runtime.operators.TableStreamOperator
        implements 
org.apache.flink.streaming.api.operators.OneInputStreamOperator {

    private final Object[] references;
    org.apache.flink.table.data.BoxedWrapperRowData out =
            new org.apache.flink.table.data.BoxedWrapperRowData(1);
    private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
outElement =
            new 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

    public StreamExecCalc$8(
            Object[] references,
            org.apache.flink.streaming.runtime.tasks.StreamTask task,
            org.apache.flink.streaming.api.graph.StreamConfig config,
            org.apache.flink.streaming.api.operators.Output output,
            org.apache.flink.streaming.runtime.tasks.ProcessingTimeService 
processingTimeService)
            throws Exception {
        this.references = references;

        this.setup(task, config, output);
        if (this instanceof 
org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
            ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) 
this)
                    .setProcessingTimeService(processingTimeService);
        }
    }

    @Override
    public void open() throws Exception {
        super.open();
    }

    @Override
    public void 
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord 
element)
            throws Exception {
        org.apache.flink.table.data.RowData in1 =
                (org.apache.flink.table.data.RowData) element.getValue();

        int field$0;
        boolean isNull$0;
        int field$1;
        boolean isNull$1;
        int field$3;
        boolean isNull$3;
        int field$4;
        boolean isNull$4;
        boolean isNull$6;
        boolean result$7;

        isNull$3 = in1.isNullAt(2);
        field$3 = -1;
        if (!isNull$3) {
            field$3 = in1.getInt(2);
        }
        isNull$0 = in1.isNullAt(0);
        field$0 = -1;
        if (!isNull$0) {
            field$0 = in1.getInt(0);
        }
        isNull$1 = in1.isNullAt(1);
        field$1 = -1;
        if (!isNull$1) {
            field$1 = in1.getInt(1);
        }
        isNull$4 = in1.isNullAt(3);
        field$4 = -1;
        if (!isNull$4) {
            field$4 = in1.getInt(3);
        }

        out.setRowKind(in1.getRowKind());

        java.lang.Integer result$2 = field$0;
        boolean nullTerm$2 = false;

        if (!nullTerm$2) {
            java.lang.Integer cur$2 = field$0;
            if (isNull$0) {
                nullTerm$2 = true;
            } else {
                int compareResult = result$2.compareTo(cur$2);
                if ((true && compareResult < 0) || (compareResult > 0 && 
!true)) {
                    result$2 = cur$2;
                }
            }
        }

        if (!nullTerm$2) {
            java.lang.Integer cur$2 = field$1;
            if (isNull$1) {
                nullTerm$2 = true;
            } else {
                int compareResult = result$2.compareTo(cur$2);
                if ((true && compareResult < 0) || (compareResult > 0 && 
!true)) {
                    result$2 = cur$2;
                }
            }
        }

        if (nullTerm$2) {
            result$2 = null;
        }

        java.lang.Integer result$5 = field$3;
        boolean nullTerm$5 = false;

        if (!nullTerm$5) {
            java.lang.Integer cur$5 = field$3;
            if (isNull$3) {
                nullTerm$5 = true;
            } else {
                int compareResult = result$5.compareTo(cur$5);
                if ((true && compareResult < 0) || (compareResult > 0 && 
!true)) {
                    result$5 = cur$5;
                }
            }
        }

        if (!nullTerm$5) {
            java.lang.Integer cur$5 = field$4;
            if (isNull$4) {
                nullTerm$5 = true;
            } else {
                int compareResult = result$5.compareTo(cur$5);
                if ((true && compareResult < 0) || (compareResult > 0 && 
!true)) {
                    result$5 = cur$5;
                }
            }
        }

        if (nullTerm$5) {
            result$5 = null;
        }

        isNull$6 = nullTerm$2 || nullTerm$5;
        result$7 = false;
        if (!isNull$6) {

            result$7 = result$2 == result$5;
        }

        if (isNull$6) {
            out.setNullAt(0);
        } else {
            out.setBoolean(0, result$7);
        }

        output.collect(outElement.replace(out));
    }

    @Override
    public void close() throws Exception {
        super.close();
    }
}
{code}

You can see that line 137 compares two boxed Integer types with {{==}} instead 
of {{.equals}}, which causes this problem.

In older Flink versions where the return types of {{cast}} functions are also 
boxed types, casting strings to numeric values are also affected by this bug.

Currently for a quick fix we can rewrite the generated code. But for a long 
term solution we shouldn't use boxed types as internal data structures.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to