Robert Metzger created FLINK-13738:
--------------------------------------
Summary: NegativeArraySizeException in LongHybridHashTable
Key: FLINK-13738
URL: https://issues.apache.org/jira/browse/FLINK-13738
Project: Flink
Issue Type: Task
Components: Table SQL / Runtime
Affects Versions: 1.9.0
Reporter: Robert Metzger
Executing this (meaningless) query:
{code:java}
INSERT INTO sinkTable ( SELECT CONCAT( CAST( id AS VARCHAR), CAST( COUNT(*) AS
VARCHAR)) as something, 'const' FROM CsvTable, table1 WHERE sometxt LIKE 'a%'
AND id = key GROUP BY id ) {code}
leads to the following exception:
{code:java}
Caused by: java.lang.NegativeArraySizeException
at
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.tryDenseMode(LongHybridHashTable.java:216)
at
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.endBuild(LongHybridHashTable.java:105)
at LongHashJoinOperator$36.endInput1$(Unknown Source)
at LongHashJoinOperator$36.endInput(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:256)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor.checkFinished(StreamTwoInputSelectableProcessor.java:359)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor.processInput(StreamTwoInputSelectableProcessor.java:193)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:687)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:517)
at java.lang.Thread.run(Thread.java:748){code}
This is the plan:
{code:java}
== Abstract Syntax Tree ==
LogicalSink(name=[sinkTable], fields=[f0, f1])
+- LogicalProject(something=[CONCAT(CAST($0):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE", CAST($1):VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL)],
EXPR$1=[_UTF-16LE'const'])
+- LogicalAggregate(group=[
{0}
], agg#0=[COUNT()])
+- LogicalProject(id=[$1])
+- LogicalFilter(condition=[AND(LIKE($0, _UTF-16LE'a%'), =($1,
CAST($2):BIGINT))])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalTableScan(table=[[default_catalog, default_database, CsvTable,
source: [CsvTableSource(read fields: sometxt, id)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, table1, source:
[GeneratorTableSource(key, rowtime, payload)]]])
== Optimized Logical Plan ==
Sink(name=[sinkTable], fields=[f0, f1]): rowcount = 1498810.6659336376,
cumulative cost =
{4.459964319978008E8 rows, 1.879799762133187E10 cpu, 4.8E9 io, 8.4E8 network,
1.799524266373455E8 memory}
+- Calc(select=[CONCAT(CAST(id), CAST($f1)) AS something, _UTF-16LE'const' AS
EXPR$1]): rowcount = 1498810.6659336376, cumulative cost =
{4.444976213318672E8 rows, 1.8796498810665936E10 cpu, 4.8E9 io, 8.4E8 network,
1.799524266373455E8 memory}
+- HashAggregate(isMerge=[false], groupBy=[id], select=[id, COUNT(*) AS $f1]):
rowcount = 1498810.6659336376, cumulative cost =
{4.429988106659336E8 rows, 1.8795E10 cpu, 4.8E9 io, 8.4E8 network,
1.799524266373455E8 memory}
+- Calc(select=[id]): rowcount = 1.575E7, cumulative cost =
{4.415E8 rows, 1.848E10 cpu, 4.8E9 io, 8.4E8 network, 1.2E8 memory}
+- HashJoin(joinType=[InnerJoin], where=[=(id, key0)], select=[id, key0],
build=[left]): rowcount = 1.575E7, cumulative cost =
{4.2575E8 rows, 1.848E10 cpu, 4.8E9 io, 8.4E8 network, 1.2E8 memory}
:- Exchange(distribution=[hash[id]]): rowcount = 5000000.0, cumulative cost =
{1.1E8 rows, 8.4E8 cpu, 2.0E9 io, 4.0E7 network, 0.0 memory}
: +- Calc(select=[id], where=[LIKE(sometxt, _UTF-16LE'a%')]): rowcount =
5000000.0, cumulative cost =
{1.05E8 rows, 0.0 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
: +- TableSourceScan(table=[[default_catalog, default_database, CsvTable,
source: [CsvTableSource(read fields: sometxt, id)]]], fields=[sometxt, id]):
rowcount = 1.0E8, cumulative cost =
{1.0E8 rows, 0.0 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
+- Exchange(distribution=[hash[key0]]): rowcount = 1.0E8, cumulative cost =
{3.0E8 rows, 1.68E10 cpu, 2.8E9 io, 8.0E8 network, 0.0 memory}
+- Calc(select=[CAST(key) AS key0]): rowcount = 1.0E8, cumulative cost =
{2.0E8 rows, 0.0 cpu, 2.8E9 io, 0.0 network, 0.0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, table1, source:
[GeneratorTableSource(key, rowtime, payload)]]], fields=[key, rowtime,
payload]): rowcount = 1.0E8, cumulative cost =
{1.0E8 rows, 0.0 cpu, 2.8E9 io, 0.0 network, 0.0 memory}
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: sometxt, id)
ship_strategy : REBALANCE
Stage 3 : Operator
content : SourceConversion(table=[default_catalog.default_database.CsvTable,
source: [CsvTableSource(read fields: sometxt, id)]], fields=[sometxt, id])
ship_strategy : FORWARD
Stage 4 : Operator
content : Calc(select=[id], where=[(sometxt LIKE _UTF-16LE'a%')])
ship_strategy : FORWARD
Stage 6 : Data Source
content : collect elements with CollectionInputFormat
Stage 7 : Operator
content : SourceConversion(table=[default_catalog.default_database.table1,
source: [GeneratorTableSource(key, rowtime, payload)]], fields=[key, rowtime,
payload])
ship_strategy : FORWARD
Stage 8 : Operator
content : Calc(select=[CAST(key) AS key0])
ship_strategy : FORWARD
Stage 10 : Operator
content : HashJoin(joinType=[InnerJoin], where=[(id = key0)], select=[id,
key0], build=[left])
ship_strategy : HASH[id]
Stage 11 : Operator
content : Calc(select=[id])
ship_strategy : FORWARD
Stage 12 : Operator
content : HashAggregate(isMerge=[false], groupBy=[id], select=[id, COUNT(*) AS
$f1])
ship_strategy : FORWARD
Stage 13 : Operator
content : Calc(select=[(CAST(id) CONCAT CAST($f1)) AS something,
_UTF-16LE'const' AS EXPR$1])
ship_strategy : FORWARD
Stage 14 : Operator
content : SinkConversionToRow
ship_strategy : FORWARD
Stage 15 : Operator
content : Map
ship_strategy : FORWARD
Stage 16 : Data Sink
content : Sink: CsvTableSink(f0, f1)
ship_strategy : FORWARD
{code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)