Hi Michael,

You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.

You can consider
`WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or
`WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both
of them call `processElementAndEnsureOutput`) as a example.

[1]:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676

Best,
Vino

Nguyen, Michael <michael.nguye...@t-mobile.com> 于2019年10月28日周一 下午3:18写道:

> Hello everbody,
>
>
>
> Has anyone tried testing AggregateFunction() and ProcessWindowFunction()
> on a KeyedDataStream? I have reviewed the testing page on Flink’s official
> website (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html)
> and I am not quite sure how I could utilize these two functions in an
> .aggregate() operator for my testing.
>
>
>
> Here’s how I am using the AggregateFunction (EventCountAggregate()) and
> ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
>
> DataStream<Tuple2<Date, Integer>> ec2EventsAggregate =
>         ec2Events
>                 .keyBy(t -> t.f0)
>                 .timeWindow(Time.*minutes*(30))
>                 .aggregate(new EventCountAggregate(), new
> CalculateWindowTotal())
>                 .name("EC2 creation interval count");
>
>
>
>
>
> EventCountAggregate() is counting the each element in ec2Events datastream.
>
>
>
> CalculateWindowTotal() takes the timestamp of each 30 minute window and
> correlates it to the number of elements that has been counted so far for
> the window which returns a Tuple2 containg the end timestamp and the count
> of elements.
>
>
>
>
>
> Thanks,
>
> Michael
>

Reply via email to