Can you reproduce this in a local program with mini-cluster? Best, Kurt
On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <zahidr1...@gmail.com> wrote: > You can read this for this type error. > > > https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446 > > I would suggest you set break points in your code. Step through the code, > this method should show you which array variable is being passed a null > argument when the array variable is not null able. > > > > > On Mon, 20 Apr 2020, 10:07 刘建刚, <liujiangangp...@gmail.com> wrote: > >> I am using Roaring64NavigableMap to compute uv. It is ok to us >> flink planner and not ok with blink planner. The SQL is as following: >> >> SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as curTimestamp, >> A, B, C, D, >> E, uv(bitmap(id)) as bmp >> FROM person >> GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E >> >> >> The udf is as following: >> >> public static class Bitmap extends AggregateFunction<Roaring64NavigableMap, >> Roaring64NavigableMap> { >> @Override >> public Roaring64NavigableMap createAccumulator() { >> return new Roaring64NavigableMap(); >> } >> >> @Override >> public Roaring64NavigableMap getValue(Roaring64NavigableMap accumulator) { >> return accumulator; >> } >> >> public void accumulate(Roaring64NavigableMap bitmap, long id) { >> bitmap.add(id); >> } >> } >> >> public static class UV extends ScalarFunction { >> public long eval(Roaring64NavigableMap bitmap) { >> return bitmap.getLongCardinality(); >> } >> } >> >> The error is as following: >> >> 2020-04-20 16:37:13,868 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph >> [flink-akka.actor.default-dispatcher-40] - >> GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, >> appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], >> properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, >> platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, >> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, >> proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS >> curTimestamp, brand, platform, channel, versionName, appMajorVersion, >> uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> Sink: >> Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from RUNNING >> to FAILED. >> java.lang.ArrayIndexOutOfBoundsException: -1 >> at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61) >> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262) >> at >> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62) >> at >> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37) >> at >> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150) >> at >> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117) >> at >> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50) >> at >> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297) >> at >> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244) >> at >> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138) >> at >> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73) >> at >> org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196) >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >> at java.lang.Thread.run(Thread.java:745) >> >> Do I need register Roaring64NavigableMap somewhere? Anyone can help >> me? Thank you. >> >>