Hi David, Scotty's approach to sliding windows seems really interesting ;)
I also agree that the idea is very interesting. It will be very effective for our use case as we create a sliding window of size 10 minutes that slides by 1 minutes for every road of the country. Looking at the code [1], it seems to be no longer maintained. It's both > compiled and tested against Flink 1.8 so I wouldn't really expect it to be > compatible with 1.14.x :( > Yeah, we tried to compile it with Flink 1.14.x but it failed. Scotty compiled with Flink 1.8 nonetheless seems to work well for about 10~20 minutes when we executed the job on a cluster, so we thought that Flink's runtime, especially windows, hasn't changed significantly since 1.8. I found that Flink committers previously discussed implanting the core idea to Flink [1] but it seems not discussed anymore, unfortunately. Thanks for the reply :-) Best, Dongwon On Wed, Dec 29, 2021 at 5:53 PM David Morávek <d...@apache.org> wrote: > Hi Dongwon, > > Scotty's approach to sliding windows seems really interesting ;) Looking > at the code [1], it seems to be no longer maintained. It's both compiled > and tested against Flink 1.8 so I wouldn't really expect it to be > compatible with 1.14.x :( > > [1] https://github.com/TU-Berlin-DIMA/scotty-window-processor > > Best, > D. > > On Tue, Dec 28, 2021 at 4:15 AM Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Hi community, >> >> We're recently trying to adopt Scotty to overcome the poor performance >> caused by too many sliding windows. >> >> We're facing the following exception on the latest Flink-1.14.2: >> >> switched from RUNNING to FAILED with failure cause: >> java.lang.ArrayIndexOutOfBoundsException: -1 >> at java.util.ArrayList.elementData(ArrayList.java:424) >> at java.util.ArrayList.get(ArrayList.java:437) >> at >> de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.getSlice(LazyAggregateStore.java:53) >> at >> de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.insertValueToSlice(LazyAggregateStore.java:64) >> at >> de.tub.dima.scotty.slicing.SliceManager.processElement(SliceManager.java:76) >> at >> de.tub.dima.scotty.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:43) >> at >> de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator.processElement(KeyedScottyWindowOperator.java:62) >> at >> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >> at java.lang.Thread.run(Thread.java:748) >> >> We create a window operator as follows: >> >> import de.tub.dima.scotty.core.windowType.{SlidingWindow, WindowMeasure} >> import de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator >> import de.tub.dima.scotty.flinkconnector.{_} >> >> val windowOp = new KeyedScottyWindowOperator[(java.lang.String, Long), >> NaviGpsProcessable, NaviTrafficUserResult](new >> NaviTrafficUserAggregationScotty()) >> windowOp.addWindow(new SlidingWindow(WindowMeasure.Time, 600_000, >> 60_000)) >> >> val userAggStream = stream >> .keyBy(el => (el.id, el.trafficId)) >> .process(windowOp) >> .map(_.getAggValues.get(0)); >> >> Can I get any advice on this? >> >> Best, >> >> Dongwon >> >