Hi,

Sorry for the mistake, [1] is related, but this bug has been fixed totally
in [2], so the safe version should be 1.9.3+ and 1.10.1+, so there is no
safe released version now.

1.10.1 will been released very soon.

[1]https://issues.apache.org/jira/browse/FLINK-13702
[2]https://issues.apache.org/jira/browse/FLINK-16242

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 4:50 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi,
>
> Just like Jark said, it may be FLINK-13702[1]. Has been fixed in 1.9.2 and
> later versions.
>
> > Can it be a thread-safe problem or something else?
>
> Yes, it is a thread-safe problem with lazy materialization.
>
> [1]https://issues.apache.org/jira/browse/FLINK-13702
>
> Best,
> Jingsong Lee
>
> On Tue, Apr 21, 2020 at 1:21 PM forideal <fszw...@163.com> wrote:
>
>> Hi Kurt:
>>     I had the same mistake.
>>
>>    sql:
>>    insert into
>> dw_access_log select get_json_value(query_nor, query_nor_counter) as
>> `value` from ods_access_log_source group by tumble (time_key, interval
>> '1' MINUTE),
>> group_key
>>
>> get_json_value
>>
>> public class GetJsonValue extends AggregateFunction<String, Map<String, 
>> Long>> {
>>     @Override
>>     public boolean isDeterministic() {
>>         return false;
>>     }
>>
>>     @Override
>>     public Map<String, Long> createAccumulator() {
>>         return new HashMap<>();
>>     }
>>
>>     @Override
>>     public void open(FunctionContext context) throws Exception {
>>
>>     }
>>
>>     public void accumulate(Map<String, Long> datas, String key, long value) {
>>         datas.put(key, value);
>>     }
>>
>>     @Override
>>     public String getValue(Map<String, Long> acc) {
>>         return JSON.toJSONString(acc);
>>     }
>>
>>
>>     @Override
>>     public TypeInformation getResultType() {
>>         return Types.STRING;
>>     }
>>
>> }
>>
>>
>> Best forideal
>>
>>
>>
>>
>> At 2020-04-21 10:05:05, "Kurt Young" <ykt...@gmail.com> wrote:
>>
>> Thanks, once you can reproduce this issue locally, please open a jira
>> with your testing program.
>>
>> Best,
>> Kurt
>>
>>
>> On Tue, Apr 21, 2020 at 8:48 AM 刘建刚 <liujiangangp...@gmail.com> wrote:
>>
>>> Thank you. It is an online job and my input is huge. I check the trace
>>> and find that the array is resized when the array is not enough. The code
>>> is as below:
>>>
>>> public void add (int value) {
>>>    int[] items = this.items;
>>>    if (size == items.length) items = resize(Math.max(8, (int)(size * 
>>> 1.75f)));
>>>    items[size++] = value;
>>> }
>>>
>>>
>>> Only blink planner has this error. Can it be a thread-safe problem or
>>> something else? I will try to reproduce it locally.
>>>
>>> 2020年4月21日 上午12:20,Jark Wu-3 [via Apache Flink User Mailing List
>>> archive.] <ml+s2336050n34491...@n4.nabble.com> 写道:
>>>
>>> Hi,
>>>
>>> Are you using versions < 1.9.2? From the exception stack, it looks like
>>> caused by FLINK-13702, which is already fixed in 1.9.2 and 1.10.0.
>>> Could you try it using 1.9.2?
>>>
>>> Best,
>>> Jark
>>>
>>> On Mon, 20 Apr 2020 at 21:00, Kurt Young <[hidden email]> wrote:
>>>
>>>> Can you reproduce this in a local program with mini-cluster?
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Mon, Apr 20, 2020 at 8:07 PM Zahid Rahman <[hidden email]> wrote:
>>>>
>>>>> You can read this for this type error.
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/28189446/i-always-get-this-error-exception-in-thread-main-java-lang-arrayindexoutofbou#comment44747327_28189446
>>>>>
>>>>> I would suggest you set break points  in your code. Step through the
>>>>> code, this  method should show you which array variable is being passed a
>>>>> null argument when the array variable is not null able.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 20 Apr 2020, 10:07 刘建刚, <[hidden email]> wrote:
>>>>>
>>>>>>        I am using Roaring64NavigableMap to compute uv. It is ok to us
>>>>>> flink planner and not ok with blink planner. The SQL is as following:
>>>>>>
>>>>>> SELECT toLong(TUMBLE_START(eventTime, interval '1' minute)) as 
>>>>>> curTimestamp, A, B, C, D,
>>>>>>         E, uv(bitmap(id)) as bmp
>>>>>> FROM person
>>>>>> GROUP BY TUMBLE(eventTime, interval '1' minute), A, B, C, D, E
>>>>>>
>>>>>>
>>>>>>       The udf is as following:
>>>>>>
>>>>>> public static class Bitmap extends 
>>>>>> AggregateFunction<Roaring64NavigableMap, Roaring64NavigableMap> {
>>>>>>    @Override
>>>>>>    public Roaring64NavigableMap createAccumulator() {
>>>>>>       return new Roaring64NavigableMap();
>>>>>>    }
>>>>>>
>>>>>>    @Override
>>>>>>    public Roaring64NavigableMap getValue(Roaring64NavigableMap 
>>>>>> accumulator) {
>>>>>>       return accumulator;
>>>>>>    }
>>>>>>
>>>>>>    public void accumulate(Roaring64NavigableMap bitmap, long id) {
>>>>>>       bitmap.add(id);
>>>>>>    }
>>>>>> }
>>>>>>
>>>>>> public static class UV extends ScalarFunction {
>>>>>>    public long eval(Roaring64NavigableMap bitmap) {
>>>>>>       return bitmap.getLongCardinality();
>>>>>>    }
>>>>>> }
>>>>>>
>>>>>>       The error is as following:
>>>>>>
>>>>>> 2020-04-20 16:37:13,868 INFO
>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>>>>  [flink-akka.actor.default-dispatcher-40]  -
>>>>>> GroupWindowAggregate(groupBy=[brand, platform, channel, versionName,
>>>>>> appMajorVersion], window=[TumblingGroupWindow('w$, eventTime, 60000)],
>>>>>> properties=[w$start, w$end, w$rowtime, w$proctime], select=[brand,
>>>>>> platform, channel, versionName, appMajorVersion, bitmap(id) AS $f5,
>>>>>> start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime,
>>>>>> proctime('w$) AS w$proctime]) -> Calc(select=[toLong(w$start) AS
>>>>>> curTimestamp, brand, platform, channel, versionName, appMajorVersion,
>>>>>> uv($f5) AS bmp]) -> SinkConversionToTuple2 -> (Flat Map, Flat Map -> 
>>>>>> Sink:
>>>>>> Unnamed) (321/480) (8eb918b493ea26e2bb60f8307347dc1a) switched from 
>>>>>> RUNNING
>>>>>> to FAILED.
>>>>>> java.lang.ArrayIndexOutOfBoundsException: -1
>>>>>>   at com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
>>>>>>   at
>>>>>> com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
>>>>>>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
>>>>>>   at
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
>>>>>>   at
>>>>>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:62)
>>>>>>   at
>>>>>> org.apache.flink.table.runtime.typeutils.BinaryGenericSerializer.copy(BinaryGenericSerializer.java:37)
>>>>>>   at
>>>>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copyBaseRow(BaseRowSerializer.java:150)
>>>>>>   at
>>>>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:117)
>>>>>>   at
>>>>>> org.apache.flink.table.runtime.typeutils.BaseRowSerializer.copy(BaseRowSerializer.java:50)
>>>>>>   at
>>>>>> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:297)
>>>>>>   at
>>>>>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:244)
>>>>>>   at
>>>>>> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:138)
>>>>>>   at
>>>>>> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>>>>>>   at
>>>>>> org.apache.flink.table.runtime.operators.window.WindowOperator.processElement(WindowOperator.java:337)
>>>>>>   at
>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.processRecord(OneInputStreamTask.java:204)
>>>>>>   at
>>>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:196)
>>>>>>   at
>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>>>>>>   at
>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>>>>>>   at
>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>>>>>>   at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>>>>>>   at
>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>>>>>>   at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>>>>>>   at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>>>>>>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>>>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>>>>>   at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>>       Do I need register Roaring64NavigableMap somewhere? Anyone can
>>>>>> help me? Thank you.
>>>>>>
>>>>>>
>>>
>>> ------------------------------
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>>
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Blink-SQL-java-lang-ArrayIndexOutOfBoundsException-tp34467p34491.html
>>> To start a new topic under Apache Flink User Mailing List archive., email
>>>  ml+s2336050n1...@n4.nabble.com
>>> To unsubscribe from Apache Flink User Mailing List archive., click here
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=bGl1amlhbmdhbmdwZW5nQGdtYWlsLmNvbXwxfC0xMTYwNzM3MjI=>
>>> .
>>> NAML
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>>
>>>
>>>
>>
>>
>>
>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee

Reply via email to