whoops.. as usual, posting led me to find some answers myself. Does this
make sense given my requirements?

Thanks!

private class MyWindowAssigner(val windowSize: Time) :
WindowAssigner<Record, TimeWindow>() {
    private val trigger = CountTrigger.of<TimeWindow>(1) as
Trigger<Record TimeWindow>

    override fun assignWindows(
        element: Record,
        timestamp: Long,
        context: WindowAssignerContext
    ): MutableCollection<TimeWindow> {
        return mutableListOf(TimeWindow(timestamp -
windowSize.toMilliseconds(), timestamp))
    }

    override fun getDefaultTrigger(env: StreamExecutionEnvironment?):
Trigger<Record, TimeWindow> {
        return trigger
    }

    override fun getWindowSerializer(executionConfig:
ExecutionConfig?): TypeSerializer<TimeWindow> {
        return TimeWindow.Serializer()
    }

    override fun isEventTime(): Boolean {
        return true
    }
}


On Tue, Oct 20, 2020 at 9:13 AM Alex Cruise <a...@cluonflux.com> wrote:

> Hey folks!
>
> I have an application that wants to use "stepless" sliding windows, i.e.
> we produce aggregates on every event. The windows need to be of a fixed
> size, but to have their start and end times update continuously, and I'd
> like to trigger on every event. Is this a bad idea? I've googled and read
> the docs extensively and haven't been able to identify built-in
> functionality or examples that map cleanly to my requirements.
>
> OK, I just found DeltaTrigger, which looks promising... Does it make sense
> to write a WindowAssigner that makes a new Window on every event,
> allocation rates aside?
>
> Thanks!
>
> -0xe1a
>

Reply via email to