Hi Dongwon,

Scotty's approach to sliding windows seems really interesting ;) Looking at
the code [1], it seems to be no longer maintained. It's both compiled and
tested against Flink 1.8 so I wouldn't really expect it to be compatible
with 1.14.x :(

[1] https://github.com/TU-Berlin-DIMA/scotty-window-processor

Best,
D.

On Tue, Dec 28, 2021 at 4:15 AM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi community,
>
> We're recently trying to adopt Scotty to overcome the poor performance
> caused by too many sliding windows.
>
> We're facing the following exception on the latest Flink-1.14.2:
>
> switched from RUNNING to FAILED with failure cause: 
> java.lang.ArrayIndexOutOfBoundsException: -1
>       at java.util.ArrayList.elementData(ArrayList.java:424)
>       at java.util.ArrayList.get(ArrayList.java:437)
>       at 
> de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.getSlice(LazyAggregateStore.java:53)
>       at 
> de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.insertValueToSlice(LazyAggregateStore.java:64)
>       at 
> de.tub.dima.scotty.slicing.SliceManager.processElement(SliceManager.java:76)
>       at 
> de.tub.dima.scotty.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:43)
>       at 
> de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator.processElement(KeyedScottyWindowOperator.java:62)
>       at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>       at java.lang.Thread.run(Thread.java:748)
>
> We create a window operator as follows:
>
> import de.tub.dima.scotty.core.windowType.{SlidingWindow, WindowMeasure}
> import de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator
> import de.tub.dima.scotty.flinkconnector.{_}
>
>     val windowOp = new KeyedScottyWindowOperator[(java.lang.String, Long), 
> NaviGpsProcessable, NaviTrafficUserResult](new 
> NaviTrafficUserAggregationScotty())
>     windowOp.addWindow(new SlidingWindow(WindowMeasure.Time, 600_000, 60_000))
>
>     val userAggStream = stream
>       .keyBy(el => (el.id, el.trafficId))
>       .process(windowOp)
>       .map(_.getAggValues.get(0));
>
> Can I get any advice on this?
>
> Best,
>
> Dongwon
>

Reply via email to