Hi Michael, >From the WindowTranslationTest, I did not see anything about the initialization of mini-cluster. Here we are testing operator, it seems operator test harness has provided the necessary infrastructure.
You can try to see if there is anything missed. Best, Vino Nguyen, Michael <michael.nguye...@t-mobile.com> 于2019年10月28日周一 下午4:51写道: > Hi Vino, > > > > This is a great example – thank you! > > > > It looks like I need to instantiate a StreamExecutionEnvironment to order > to get my OneInputStreamOperator. Would I need to setup a local > flinkCluster using MiniClusterWithClientResource in order to use > StreamExecutionEnvironment? > > > > > > Best, > > Michael > > > > > > *From: *vino yang <yanghua1...@gmail.com> > *Date: *Monday, October 28, 2019 at 1:32 AM > *To: *Michael Nguyen <michael.nguye...@t-mobile.com> > *Cc: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: Testing AggregateFunction() and ProcessWindowFunction() on > KeyedDataStream > > > > *[External]* > > > > Hi Michael, > > > > You may need to know `KeyedOneInputStreamOperatorTestHarness` test class. > > > > You can consider > `WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or > `WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both > of them call `processElementAndEnsureOutput`) as a example. > > > > [1]: > https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676 > <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-streaming-java%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fruntime%2Foperators%2Fwindowing%2FWindowTranslationTest.java%23L676&data=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538&sdata=AfjOTQGV8OFR9azcGzxpwXUrCRptRiYwVAVk7EYlNBY%3D&reserved=0> > > > > Best, > > Vino > > > > Nguyen, Michael <michael.nguye...@t-mobile.com> 于2019年10月28日周一 下午3:18写道: > > Hello everbody, > > > > Has anyone tried testing AggregateFunction() and ProcessWindowFunction() > on a KeyedDataStream? I have reviewed the testing page on Flink’s official > website ( > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html > <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Fstream%2Ftesting.html&data=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538&sdata=aeedhITUuaNaBpt92scqzi0EcAqyyuVGVHKCM7euJtc%3D&reserved=0>) > and I am not quite sure how I could utilize these two functions in an > .aggregate() operator for my testing. > > > > Here’s how I am using the AggregateFunction (EventCountAggregate()) and > ProcessWindowFunction (CalculateWindowTotal()) in my Flink job: > > DataStream<Tuple2<Date, Integer>> ec2EventsAggregate = > ec2Events > .keyBy(t -> t.f0) > .timeWindow(Time.*minutes*(30)) > .aggregate(new EventCountAggregate(), new > CalculateWindowTotal()) > .name("EC2 creation interval count"); > > > > > > EventCountAggregate() is counting the each element in ec2Events datastream. > > > > CalculateWindowTotal() takes the timestamp of each 30 minute window and > correlates it to the number of elements that has been counted so far for > the window which returns a Tuple2 containg the end timestamp and the count > of elements. > > > > > > Thanks, > > Michael > >