Hi David,

Scotty's approach to sliding windows seems really interesting ;)

I also agree that the idea is very interesting. It will be very effective
for our use case as we create a sliding window of size 10 minutes that
slides by 1 minutes for every road of the country.

 Looking at the code [1], it seems to be no longer maintained. It's both
> compiled and tested against Flink 1.8 so I wouldn't really expect it to be
> compatible with 1.14.x :(
>
Yeah, we tried to compile it with Flink 1.14.x but it failed. Scotty
compiled with Flink 1.8 nonetheless seems to work well for about 10~20
minutes when we executed the job on a cluster, so we thought that Flink's
runtime, especially windows, hasn't changed significantly since 1.8.

I found that Flink committers previously discussed implanting the core idea
to Flink [1] but it seems not discussed anymore, unfortunately.

Thanks for the reply :-)

Best,

Dongwon

On Wed, Dec 29, 2021 at 5:53 PM David Morávek <d...@apache.org> wrote:

> Hi Dongwon,
>
> Scotty's approach to sliding windows seems really interesting ;) Looking
> at the code [1], it seems to be no longer maintained. It's both compiled
> and tested against Flink 1.8 so I wouldn't really expect it to be
> compatible with 1.14.x :(
>
> [1] https://github.com/TU-Berlin-DIMA/scotty-window-processor
>
> Best,
> D.
>
> On Tue, Dec 28, 2021 at 4:15 AM Dongwon Kim <eastcirc...@gmail.com> wrote:
>
>> Hi community,
>>
>> We're recently trying to adopt Scotty to overcome the poor performance
>> caused by too many sliding windows.
>>
>> We're facing the following exception on the latest Flink-1.14.2:
>>
>> switched from RUNNING to FAILED with failure cause: 
>> java.lang.ArrayIndexOutOfBoundsException: -1
>>      at java.util.ArrayList.elementData(ArrayList.java:424)
>>      at java.util.ArrayList.get(ArrayList.java:437)
>>      at 
>> de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.getSlice(LazyAggregateStore.java:53)
>>      at 
>> de.tub.dima.scotty.slicing.aggregationstore.LazyAggregateStore.insertValueToSlice(LazyAggregateStore.java:64)
>>      at 
>> de.tub.dima.scotty.slicing.SliceManager.processElement(SliceManager.java:76)
>>      at 
>> de.tub.dima.scotty.slicing.SlicingWindowOperator.processElement(SlicingWindowOperator.java:43)
>>      at 
>> de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator.processElement(KeyedScottyWindowOperator.java:62)
>>      at 
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
>>      at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>>      at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>>      at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>>      at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>>      at 
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>      at 
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>>      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>      at java.lang.Thread.run(Thread.java:748)
>>
>> We create a window operator as follows:
>>
>> import de.tub.dima.scotty.core.windowType.{SlidingWindow, WindowMeasure}
>> import de.tub.dima.scotty.flinkconnector.KeyedScottyWindowOperator
>> import de.tub.dima.scotty.flinkconnector.{_}
>>
>>     val windowOp = new KeyedScottyWindowOperator[(java.lang.String, Long), 
>> NaviGpsProcessable, NaviTrafficUserResult](new 
>> NaviTrafficUserAggregationScotty())
>>     windowOp.addWindow(new SlidingWindow(WindowMeasure.Time, 600_000, 
>> 60_000))
>>
>>     val userAggStream = stream
>>       .keyBy(el => (el.id, el.trafficId))
>>       .process(windowOp)
>>       .map(_.getAggValues.get(0));
>>
>> Can I get any advice on this?
>>
>> Best,
>>
>> Dongwon
>>
>

Reply via email to