[ https://issues.apache.org/jira/browse/FLINK-13738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kurt Young updated FLINK-13738: ------------------------------- Fix Version/s: 1.10.0 > NegativeArraySizeException in LongHybridHashTable > ------------------------------------------------- > > Key: FLINK-13738 > URL: https://issues.apache.org/jira/browse/FLINK-13738 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.9.0 > Reporter: Robert Metzger > Priority: Major > Fix For: 1.10.0 > > > Executing this (meaningless) query: > {code:java} > INSERT INTO sinkTable ( SELECT CONCAT( CAST( id AS VARCHAR), CAST( COUNT(*) > AS VARCHAR)) as something, 'const' FROM CsvTable, table1 WHERE sometxt LIKE > 'a%' AND id = key GROUP BY id ) {code} > leads to the following exception: > {code:java} > Caused by: java.lang.NegativeArraySizeException > at > org.apache.flink.table.runtime.hashtable.LongHybridHashTable.tryDenseMode(LongHybridHashTable.java:216) > at > org.apache.flink.table.runtime.hashtable.LongHybridHashTable.endBuild(LongHybridHashTable.java:105) > at LongHashJoinOperator$36.endInput1$(Unknown Source) > at LongHashJoinOperator$36.endInput(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:256) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor.checkFinished(StreamTwoInputSelectableProcessor.java:359) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor.processInput(StreamTwoInputSelectableProcessor.java:193) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:687) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:517) > at java.lang.Thread.run(Thread.java:748){code} > This is the plan: > > {code:java} > == Abstract Syntax Tree == > LogicalSink(name=[sinkTable], fields=[f0, f1]) > +- LogicalProject(something=[CONCAT(CAST($0):VARCHAR(2147483647) CHARACTER > SET "UTF-16LE", CAST($1):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT > NULL)], EXPR$1=[_UTF-16LE'const']) > +- LogicalAggregate(group=[{0}], agg#0=[COUNT()]) > +- LogicalProject(id=[$1]) > +- LogicalFilter(condition=[AND(LIKE($0, _UTF-16LE'a%'), =($1, > CAST($2):BIGINT))]) > +- LogicalJoin(condition=[true], joinType=[inner]) > :- LogicalTableScan(table=[[default_catalog, default_database, > CsvTable, source: [CsvTableSource(read fields: sometxt, id)]]]) > +- LogicalTableScan(table=[[default_catalog, default_database, > table1, source: [GeneratorTableSource(key, rowtime, payload)]]]) > == Optimized Logical Plan == > Sink(name=[sinkTable], fields=[f0, f1]): rowcount = 1498810.6659336376, > cumulative cost = {4.459964319978008E8 rows, 1.879799762133187E10 cpu, 4.8E9 > io, 8.4E8 network, 1.799524266373455E8 memory} > +- Calc(select=[CONCAT(CAST(id), CAST($f1)) AS something, _UTF-16LE'const' AS > EXPR$1]): rowcount = 1498810.6659336376, cumulative cost = > {4.444976213318672E8 rows, 1.8796498810665936E10 cpu, 4.8E9 io, 8.4E8 > network, 1.799524266373455E8 memory} > +- HashAggregate(isMerge=[false], groupBy=[id], select=[id, COUNT(*) AS > $f1]): rowcount = 1498810.6659336376, cumulative cost = {4.429988106659336E8 > rows, 1.8795E10 cpu, 4.8E9 io, 8.4E8 network, 1.799524266373455E8 memory} > +- Calc(select=[id]): rowcount = 1.575E7, cumulative cost = {4.415E8 > rows, 1.848E10 cpu, 4.8E9 io, 8.4E8 network, 1.2E8 memory} > +- HashJoin(joinType=[InnerJoin], where=[=(id, key0)], select=[id, > key0], build=[left]): rowcount = 1.575E7, cumulative cost = {4.2575E8 rows, > 1.848E10 cpu, 4.8E9 io, 8.4E8 network, 1.2E8 memory} > :- Exchange(distribution=[hash[id]]): rowcount = 5000000.0, > cumulative cost = {1.1E8 rows, 8.4E8 cpu, 2.0E9 io, 4.0E7 network, 0.0 memory} > : +- Calc(select=[id], where=[LIKE(sometxt, _UTF-16LE'a%')]): > rowcount = 5000000.0, cumulative cost = {1.05E8 rows, 0.0 cpu, 2.0E9 io, 0.0 > network, 0.0 memory} > : +- TableSourceScan(table=[[default_catalog, > default_database, CsvTable, source: [CsvTableSource(read fields: sometxt, > id)]]], fields=[sometxt, id]): rowcount = 1.0E8, cumulative cost = {1.0E8 > rows, 0.0 cpu, 2.0E9 io, 0.0 network, 0.0 memory} > +- Exchange(distribution=[hash[key0]]): rowcount = 1.0E8, > cumulative cost = {3.0E8 rows, 1.68E10 cpu, 2.8E9 io, 8.0E8 network, 0.0 > memory} > +- Calc(select=[CAST(key) AS key0]): rowcount = 1.0E8, > cumulative cost = {2.0E8 rows, 0.0 cpu, 2.8E9 io, 0.0 network, 0.0 memory} > +- TableSourceScan(table=[[default_catalog, > default_database, table1, source: [GeneratorTableSource(key, rowtime, > payload)]]], fields=[key, rowtime, payload]): rowcount = 1.0E8, cumulative > cost = {1.0E8 rows, 0.0 cpu, 2.8E9 io, 0.0 network, 0.0 memory} > == Physical Execution Plan == > Stage 1 : Data Source > content : collect elements with CollectionInputFormat > Stage 2 : Operator > content : CsvTableSource(read fields: sometxt, id) > ship_strategy : REBALANCE > Stage 3 : Operator > content : > SourceConversion(table=[default_catalog.default_database.CsvTable, source: > [CsvTableSource(read fields: sometxt, id)]], fields=[sometxt, id]) > ship_strategy : FORWARD > Stage 4 : Operator > content : Calc(select=[id], where=[(sometxt > LIKE _UTF-16LE'a%')]) > ship_strategy : FORWARD > Stage 6 : Data Source > content : collect elements with CollectionInputFormat > Stage 7 : Operator > content : > SourceConversion(table=[default_catalog.default_database.table1, source: > [GeneratorTableSource(key, rowtime, payload)]], fields=[key, rowtime, > payload]) > ship_strategy : FORWARD > Stage 8 : Operator > content : Calc(select=[CAST(key) AS key0]) > ship_strategy : FORWARD > Stage 10 : Operator > content : HashJoin(joinType=[InnerJoin], > where=[(id = key0)], select=[id, key0], build=[left]) > ship_strategy : HASH[id] > Stage 11 : Operator > content : Calc(select=[id]) > ship_strategy : FORWARD > Stage 12 : Operator > content : > HashAggregate(isMerge=[false], groupBy=[id], select=[id, COUNT(*) AS $f1]) > ship_strategy : FORWARD > Stage 13 : Operator > content : > Calc(select=[(CAST(id) CONCAT CAST($f1)) AS something, _UTF-16LE'const' AS > EXPR$1]) > ship_strategy : FORWARD > Stage 14 : Operator > content : > SinkConversionToRow > ship_strategy : > FORWARD > Stage 15 : > Operator > content > : Map > > ship_strategy : FORWARD > Stage > 16 : Data Sink > > content : Sink: CsvTableSink(f0, f1) > > ship_strategy : FORWARD > {code} > -- This message was sent by Atlassian JIRA (v7.6.14#76016)