Hello,

We're trying to write a test for an implementation of *AggregateFunction*
following a *EventTimeSessionWindows.withGap*. We gave it a try using
*WindowOperator*() which we hoped could be used as an argument to
*KeyedOneInputStreamOperatorTestHarness*. We're a bit stuck, and we're
hoping someone has a tip or two. Specifically, we can't find the right
*InternalWindowFunction* to pass to WindowOperator(). Below, *MyAggregator*
is our implementation of the
*AggregateFunction. *

Does anyone have a template, or guide, to test a windowed aggregate
function?

Kind regards,
Lars


    val myWindowOperator = new WindowOperator(
      EventTimeSessionWindows.withGap(Time.seconds(10)),
      new TimeWindow.Serializer(),
      new KeySelector[MyInputType, (String, String)] {
        override def getKey(value: MyInputType): (String, String) = {
          (value.a, value.b)
        }
      },
      Types.TUPLE(Types.STRING).createSerializer(
        new ExecutionConfig
      ),
      new AggregatingStateDescriptor[MyInputType, MyAggregateState,
MyOutputType](
        "test", new MyAggregator, classOf[MyAggregateState],
      ),
      ???,
      EventTimeTrigger.create(),
      0,
      null
    )

    testHarness = new KeyedOneInputStreamOperatorTestHarness[(String,
String), MyInputType, MyOutputType](
      myWindowOperator,
      new KeySelector[MyInputType, (String, String)] {
        override def getKey(value: MyInputType): (String, String) = {
          (value.a, value.b)
        }
      },
      createTuple2TypeInformation(Types.STRING, Types.STRING)
    )

Reply via email to