[ https://issues.apache.org/jira/browse/FLINK-17847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112759#comment-17112759 ]
Leonard Xu edited comment on FLINK-17847 at 5/21/20, 5:59 AM: -------------------------------------------------------------- Thanks [~libenchao] [~jark] [~lzljs3620320] for involving. I'm +1 to throw a exception in compile phase if the index <=0, and I'm also like follow the calcite to return a null when happened ArrayIndexOutOfBoundsException in runtime phase. HDYT? was (Author: leonard xu): Thanks [~libenchao] [~jark] [~lzljs3620320] for involving. I'm +1 to throw a exception in compile phase if the index <=0, and I'm also like follow the calcite to return a null when happened ArrayIndexOutOfBoundsException in runtime phase. HDYT? > ArrayIndexOutOfBoundsException happens when codegen StreamExec operator > ----------------------------------------------------------------------- > > Key: FLINK-17847 > URL: https://issues.apache.org/jira/browse/FLINK-17847 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.10.0, 1.11.0 > Reporter: Leonard Xu > Priority: Major > Fix For: 1.10.0, 1.11.0 > > > user case: > {code:java} > //source table > create table json_table( > w_es BIGINT, > w_type STRING, > w_isDdl BOOLEAN, > w_data ARRAY<ROW<pay_info STRING, online_fee DOUBLE, sign STRING, > account_pay_fee DOUBLE>>, > w_ts TIMESTAMP(3), > w_table STRING) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.10', > 'connector.topic' = 'json-test2', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.group.id' = 'test-jdbc', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'json', > 'format.derive-schema' = 'true' > ) > // real data: > {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"} > //query > select w_ts, 'test' as city1_id, w_data[0].pay_info AS cate3_id, > w_data as pay_order_id from json_table > {code} > ~exception:~ > {code:java} > // > Caused by: java.lang.ArrayIndexOutOfBoundsException: 1427848Caused by: > java.lang.ArrayIndexOutOfBoundsException: 1427848 at > org.apache.flink.table.runtime.util.SegmentsUtil.getByteMultiSegments(SegmentsUtil.java:598) > at > org.apache.flink.table.runtime.util.SegmentsUtil.getByte(SegmentsUtil.java:590) > at > org.apache.flink.table.runtime.util.SegmentsUtil.bitGet(SegmentsUtil.java:534) > at > org.apache.flink.table.dataformat.BinaryArray.isNullAt(BinaryArray.java:117) > at StreamExecCalc$10.processElement(Unknown Source) > {code} > > Looks like in the codegen StreamExecCalc$10 operator some operation visit a > '-1' index which should be wrong, this bug exits both in 1.10 and 1.11 > > {code:java} > public class StreamExecCalc$10 extends > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator > implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator { > private final Object[] references; > private final org.apache.flink.table.dataformat.BinaryString str$3 = > org.apache.flink.table.dataformat.BinaryString.fromString("test"); > private transient > org.apache.flink.table.runtime.typeutils.BaseArraySerializer typeSerializer$5; > final org.apache.flink.table.dataformat.BoxedWrapperRow out = new > org.apache.flink.table.dataformat.BoxedWrapperRow(4); > private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > public StreamExecCalc$10( > Object[] references, > org.apache.flink.streaming.runtime.tasks.StreamTask task, > org.apache.flink.streaming.api.graph.StreamConfig config, > org.apache.flink.streaming.api.operators.Output output) throws > Exception { > this.references = references; > typeSerializer$5 = > (((org.apache.flink.table.runtime.typeutils.BaseArraySerializer) > references[0])); > this.setup(task, config, output); > } > @Override > public void open() throws Exception { > super.open(); > } > @Override > public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > org.apache.flink.table.dataformat.BaseRow in1 = > (org.apache.flink.table.dataformat.BaseRow) element.getValue(); > org.apache.flink.table.dataformat.SqlTimestamp field$2; > boolean isNull$2; > org.apache.flink.table.dataformat.BaseArray field$4; > boolean isNull$4; > org.apache.flink.table.dataformat.BaseArray field$6; > org.apache.flink.table.dataformat.BinaryString field$8; > boolean isNull$8; > org.apache.flink.table.dataformat.BinaryString result$9; > boolean isNull$9; > isNull$2 = in1.isNullAt(4); > field$2 = null; > if (!isNull$2) { > field$2 = in1.getTimestamp(4, 3); > } > isNull$4 = in1.isNullAt(3); > field$4 = null; > if (!isNull$4) { > field$4 = in1.getArray(3); > } > field$6 = field$4; > if (!isNull$4) { > field$6 = (org.apache.flink.table.dataformat.BaseArray) > (typeSerializer$5.copy(field$6)); > } > out.setHeader(in1.getHeader()); > if (isNull$2) { > out.setNullAt(0); > } else { > out.setNonPrimitiveValue(0, field$2); > } > if (false) { > out.setNullAt(1); > } else { > out.setNonPrimitiveValue(1, > ((org.apache.flink.table.dataformat.BinaryString) str$3)); > } > boolean isNull$7 = isNull$4 || false || field$6.isNullAt(((int) 0) - > 1); > org.apache.flink.table.dataformat.BaseRow result$7 = isNull$7 ? null > : field$6.getRow(((int) 0) - 1, 4); > if (isNull$7) { > result$9 = > org.apache.flink.table.dataformat.BinaryString.EMPTY_UTF8; > isNull$9 = true; > } > else { > isNull$8 = result$7.isNullAt(0); > field$8 = > org.apache.flink.table.dataformat.BinaryString.EMPTY_UTF8; > if (!isNull$8) { > field$8 = result$7.getString(0); > } > result$9 = field$8; > isNull$9 = isNull$8; > } > if (isNull$9) { > out.setNullAt(2); > } else { > out.setNonPrimitiveValue(2, result$9); > } > if (isNull$4) { > out.setNullAt(3); > } else { > out.setNonPrimitiveValue(3, field$6); > } > output.collect(outElement.replace(out)); > } > @Override > public void close() throws Exception { > super.close(); > } > } > > {code} > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)