Hi, Sorry for the mistake, [1] is related, but this bug has been fixed totally in [2], so the safe version should be 1.9.3+ and 1.10.1+, so there is no safe released version now.
1.10.1 will been released very soon. [1]https://issues.apache.org/jira/browse/FLINK-13702 [2]https://issues.apache.org/jira/browse/FLINK-16242 Best, Jingsong Lee On Wed, Apr 22, 2020 at 4:50 PM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi, > > Just like Jark said, it may be FLINK-13702[1]. Has been fixed in 1.9.2 and > later versions. > > > Can it be a thread-safe problem or something else? > > Yes, it is a thread-safe problem with lazy materialization. > > [1]https://issues.apache.org/jira/browse/FLINK-13702 > > Best, > Jingsong Lee > > On Tue, Apr 21, 2020 at 1:21 PM forideal <fszw...@163.com> wrote: > >> Hi Kurt: >> I had the same mistake. >> >> sql: >> insert into >> dw_access_log select get_json_value(query_nor, query_nor_counter) as >> `value` from ods_access_log_source group by tumble (time_key, interval >> '1' MINUTE), >> group_key >> >> get_json_value >> >> public class GetJsonValue extends AggregateFunction<String, Map<String, >> Long>> { >> @Override >> public boolean isDeterministic() { >> return false; >> } >> >> @Override >> public Map<String, Long> createAccumulator() { >> return new HashMap<>(); >> } >> >> @Override >> public void open(FunctionContext context) throws Exception { >> >> } >> >> public void accumulate(Map<String, Long> datas, String key, long value) { >> datas.put(key, value); >> } >> >> @Override >> public String getValue(Map<String, Long> acc) { >> return JSON.toJSONString(acc); >> } >> >> >> @Override >> public TypeInformation getResultType() { >> return Types.STRING; >> } >> >> } >> >> >> Best forideal >> >> >> >> >> At 2020-04-21 10:05:05, "Kurt Young" <ykt...@gmail.com> wrote: >> >> Thanks, once you can reproduce this issue locally, please open a jira >> with your testing program. >> >> Best, >> Kurt >> >> >> On Tue, Apr 21, 2020 at 8:48 AM 刘建刚 <liujiangangp...@gmail.com> wrote: >> >>> Thank you. It is an online job and my input is huge. I check the trace >>> and find that the array is resized when the array is not enough. The code >>> is as below: >>> >>> public void add (int value) { >>> int[] items = this.items; >>> if (size == items.length) items = resize(Math.max(8, (int)(size * >>> 1.75f))); >>> items[size++] = value; >>> } >>> >>> >>> Only blink planner has this error. Can it be a thread-safe problem or >>> something else? I will try to reproduce it locally. >>> >>> 2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List >>> archive.] <ml+s2336050n34491...@n4.nabble.com> 写道: >>> >>> Hi, >>> >>> Are you using versions < 1.9.2? From the exception stack, it looks like >>> caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0. >>> Could you try it using 1.9.2? >>> >>> Best, >>> Jark >>> >>> On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote: >>> >>>> Can you reproduce this in a local program with mini-cluster? >>>> >>>> Best, >>>> Kurt >>>> >>>> >>>> On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote: >>>> >>>>> You can read this for this type error. >>>>> >>>>> >>>>> https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446 >>>>> >>>>> I would suggest you set break points in your code. Step through the >>>>> code, this method should show you which array variable is being passed a >>>>> null argument when the array variable is not null able. >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote: >>>>> >>>>>> I am using Roaring64NavigableMap to compute uv. It is ok to us >>>>>> flink planner and not ok with blink planner. The SQL is as following: >>>>>> >>>>>> SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as >>>>>> curTimestamp, A, B, C, D, >>>>>> E, uv(bitmap(id)) as bmp >>>>>> FROM person >>>>>> GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E >>>>>> >>>>>> >>>>>> The udf is as following: >>>>>> >>>>>> public static class Bitmap extends >>>>>> AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> { >>>>>> @Override >>>>>> public Roaring64NavigableMap createAccumulator() { >>>>>> return new Roaring64NavigableMap(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public Roaring64NavigableMap getValue(Roaring64NavigableMap >>>>>> accumulator) { >>>>>> return accumulator; >>>>>> } >>>>>> >>>>>> public void accumulate(Roaring64NavigableMap bitmap, long id) { >>>>>> bitmap.add(id); >>>>>> } >>>>>> } >>>>>> >>>>>> public static class UV extends ScalarFunction { >>>>>> public long eval(Roaring64NavigableMap bitmap) { >>>>>> return bitmap.getLongCardinality(); >>>>>> } >>>>>> } >>>>>> >>>>>> The error is as following: >>>>>> >>>>>> 2020-04-20 16:37:13,868 INFO >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph >>>>>> [flink-akka.actor.default-dispatcher-40] - >>>>>> GroupWindowAggregate(groupBy=[brand, platform, channel, versionName, >>>>>> appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)], >>>>>> properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand, >>>>>> platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5, >>>>>> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, >>>>>> proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS >>>>>> curTimestamp, brand, platform, channel, versionName, appMajorVersion, >>>>>> uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> >>>>>> Sink: >>>>>> Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from >>>>>> RUNNING >>>>>> to FAILED. >>>>>> java.lang.ArrayIndexOutOfBoundsException: -1 >>>>>> at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61) >>>>>> at >>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800) >>>>>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262) >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62) >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37) >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150) >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117) >>>>>> at >>>>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50) >>>>>> at >>>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297) >>>>>> at >>>>>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244) >>>>>> at >>>>>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138) >>>>>> at >>>>>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73) >>>>>> at >>>>>> org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) >>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> >>>>>> Do I need register Roaring64NavigableMap somewhere? Anyone can >>>>>> help me? Thank you. >>>>>> >>>>>> >>> >>> ------------------------------ >>> If you reply to this email, your message will be added to the discussion >>> below: >>> >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Blink-SQL-java-lang-ArrayIndexOutOfBoundsException-tp34467p34491.html >>> To start a new topic under Apache Flink User Mailing List archive., email >>> ml+s2336050n1...@n4.nabble.com >>> To unsubscribe from Apache Flink User Mailing List archive., click here >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=bGl1amlhbmdhbmdwZW5nQGdtYWlsLmNvbXwxfC0xMTYwNzM3MjI=> >>> . >>> NAML >>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> >>> >>> >>> >> >> >> > > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee