Hello, We're trying to write a test for an implementation of *AggregateFunction* following a *EventTimeSessionWindows.withGap*. We gave it a try using *WindowOperator*() which we hoped could be used as an argument to *KeyedOneInputStreamOperatorTestHarness*. We're a bit stuck, and we're hoping someone has a tip or two. Specifically, we can't find the right *InternalWindowFunction* to pass to WindowOperator(). Below, *MyAggregator* is our implementation of the *AggregateFunction. *
Does anyone have a template, or guide, to test a windowed aggregate function? Kind regards, Lars val myWindowOperator = new WindowOperator( EventTimeSessionWindows.withGap(Time.seconds(10)), new TimeWindow.Serializer(), new KeySelector[MyInputType, (String, String)] { override def getKey(value: MyInputType): (String, String) = { (value.a, value.b) } }, Types.TUPLE(Types.STRING).createSerializer( new ExecutionConfig ), new AggregatingStateDescriptor[MyInputType, MyAggregateState, MyOutputType]( "test", new MyAggregator, classOf[MyAggregateState], ), ???, EventTimeTrigger.create(), 0, null ) testHarness = new KeyedOneInputStreamOperatorTestHarness[(String, String), MyInputType, MyOutputType]( myWindowOperator, new KeySelector[MyInputType, (String, String)] { override def getKey(value: MyInputType): (String, String) = { (value.a, value.b) } }, createTuple2TypeInformation(Types.STRING, Types.STRING) )