Hi Michael, You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.
You can consider `WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or `WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both of them call `processElementAndEnsureOutput`) as a example. [1]: https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676 Best, Vino Nguyen, Michael <michael.nguye...@t-mobile.com> 于2019年10月28日周一 下午3:18写道: > Hello everbody, > > > > Has anyone tried testing AggregateFunction() and ProcessWindowFunction() > on a KeyedDataStream? I have reviewed the testing page on Flink’s official > website ( > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html) > and I am not quite sure how I could utilize these two functions in an > .aggregate() operator for my testing. > > > > Here’s how I am using the AggregateFunction (EventCountAggregate()) and > ProcessWindowFunction (CalculateWindowTotal()) in my Flink job: > > DataStream<Tuple2<Date, Integer>> ec2EventsAggregate = > ec2Events > .keyBy(t -> t.f0) > .timeWindow(Time.*minutes*(30)) > .aggregate(new EventCountAggregate(), new > CalculateWindowTotal()) > .name("EC2 creation interval count"); > > > > > > EventCountAggregate() is counting the each element in ec2Events datastream. > > > > CalculateWindowTotal() takes the timestamp of each 30 minute window and > correlates it to the number of elements that has been counted so far for > the window which returns a Tuple2 containg the end timestamp and the count > of elements. > > > > > > Thanks, > > Michael >