Hi Michael,

>From the WindowTranslationTest, I did not see anything about the
initialization of mini-cluster. Here we are testing operator, it seems
operator test harness has provided the necessary infrastructure.

You can try to see if there is anything missed.

Best,
Vino

Nguyen, Michael <michael.nguye...@t-mobile.com> 于2019年10月28日周一 下午4:51写道:

> Hi Vino,
>
>
>
> This is a great example – thank you!
>
>
>
> It looks like I need to instantiate a StreamExecutionEnvironment to order
> to get my OneInputStreamOperator. Would I need to setup a local
> flinkCluster using MiniClusterWithClientResource in order to use
> StreamExecutionEnvironment?
>
>
>
>
>
> Best,
>
> Michael
>
>
>
>
>
> *From: *vino yang <yanghua1...@gmail.com>
> *Date: *Monday, October 28, 2019 at 1:32 AM
> *To: *Michael Nguyen <michael.nguye...@t-mobile.com>
> *Cc: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: Testing AggregateFunction() and ProcessWindowFunction() on
> KeyedDataStream
>
>
>
> *[External]*
>
>
>
> Hi Michael,
>
>
>
> You may need to know `KeyedOneInputStreamOperatorTestHarness` test class.
>
>
>
> You can consider
> `WindowTranslationTest#testAggregateWithWindowFunctionEventTime` or
> `WindowTranslationTest#testAggregateWithWindowFunctionProcessingTime`[1](both
> of them call `processElementAndEnsureOutput`) as a example.
>
>
>
> [1]:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java#L676
> <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-streaming-java%2Fsrc%2Ftest%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fruntime%2Foperators%2Fwindowing%2FWindowTranslationTest.java%23L676&data=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538&sdata=AfjOTQGV8OFR9azcGzxpwXUrCRptRiYwVAVk7EYlNBY%3D&reserved=0>
>
>
>
> Best,
>
> Vino
>
>
>
> Nguyen, Michael <michael.nguye...@t-mobile.com> 于2019年10月28日周一 下午3:18写道:
>
> Hello everbody,
>
>
>
> Has anyone tried testing AggregateFunction() and ProcessWindowFunction()
> on a KeyedDataStream? I have reviewed the testing page on Flink’s official
> website (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Fstream%2Ftesting.html&data=02%7C01%7CMichael.Nguyen79%40t-mobile.com%7C5a3f25ed6f29450bc95508d75b81627c%7Cbe0f980bdd994b19bd7bbc71a09b026c%7C0%7C0%7C637078483575275538&sdata=aeedhITUuaNaBpt92scqzi0EcAqyyuVGVHKCM7euJtc%3D&reserved=0>)
> and I am not quite sure how I could utilize these two functions in an
> .aggregate() operator for my testing.
>
>
>
> Here’s how I am using the AggregateFunction (EventCountAggregate()) and
> ProcessWindowFunction (CalculateWindowTotal()) in my Flink job:
>
> DataStream<Tuple2<Date, Integer>> ec2EventsAggregate =
>         ec2Events
>                 .keyBy(t -> t.f0)
>                 .timeWindow(Time.*minutes*(30))
>                 .aggregate(new EventCountAggregate(), new
> CalculateWindowTotal())
>                 .name("EC2 creation interval count");
>
>
>
>
>
> EventCountAggregate() is counting the each element in ec2Events datastream.
>
>
>
> CalculateWindowTotal() takes the timestamp of each 30 minute window and
> correlates it to the number of elements that has been counted so far for
> the window which returns a Tuple2 containg the end timestamp and the count
> of elements.
>
>
>
>
>
> Thanks,
>
> Michael
>
>

Reply via email to