Hi Lars,

you can take a look at how org.apache.flink.streaming.api.datastream.WindowedStream#WindowedStream constructs the graph under the hood. In particular, it uses org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder which constructs the InternalWindowFunction you are looking for.

You could also think about using regular DataStream API to construct the operator. And access it for the test harness via something like dataStreamn.getTransformation().getOperator(). This avoid calling too many of the internal classes.

I hope this helps.

Timo


On 10.12.21 15:46, Lars Skjærven wrote:
Hello,

We're trying to write a test for an implementation of *AggregateFunction* following a *EventTimeSessionWindows.withGap*. We gave it a try using *WindowOperator*() which we hoped could be used as an argument to *KeyedOneInputStreamOperatorTestHarness*. We're a bit stuck, and we're hoping someone has a tip or two. Specifically, we can't find the right *InternalWindowFunction* to pass to WindowOperator(). Below, *MyAggregator* is our implementation of the *AggregateFunction.
*
*
*
Does anyone have a template, or guide, to test a windowed aggregate function?*
*
*
*
Kind regards,
Lars


     val myWindowOperator = new WindowOperator(
       EventTimeSessionWindows.withGap(Time.seconds(10)),
       new TimeWindow.Serializer(),
       new KeySelector[MyInputType, (String, String)] {
         override def getKey(value: MyInputType): (String, String) = {
           (value.a, value.b)
         }
       },
       Types.TUPLE(Types.STRING).createSerializer(
         new ExecutionConfig
       ),
      new AggregatingStateDescriptor[MyInputType, MyAggregateState, MyOutputType](
         "test", new MyAggregator, classOf[MyAggregateState],
       ),
       ???,
       EventTimeTrigger.create(),
       0,
       null
     )

    testHarness = new KeyedOneInputStreamOperatorTestHarness[(String, String), MyInputType, MyOutputType](
myWindowOperator,
       new KeySelector[MyInputType, (String, String)] {
         override def getKey(value: MyInputType): (String, String) = {
           (value.a, value.b)
         }
       },
       createTuple2TypeInformation(Types.STRING, Types.STRING)
     )




Reply via email to