Yes that's it! My code https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperInline.java#L144 now does this:
DataStream<TestRecord> resultDataStream = ... List<TestRecord> result = new ArrayList<>(5); DataStreamUtils .collect(resultDataStream) .forEachRemaining(result::add); assertEquals(2, result.size()); And as you explained because the 'collect' already does an execute this works like a charm. Niels On Sat, Feb 22, 2020 at 1:38 AM Robert Metzger <rmetz...@apache.org> wrote: > Hey, > you are right. I'm also seeing this exception now. It was hidden in other > log output. > > The solution to all this confusion is simple: DataStreamUtils.collect() Is > like an execute(). > > The stream graph is cleared on each execute(). That's why collect() and > then execute() lead to the "no operators defined" error. > However, if you have collect(), print(), execute(), then the print() is > filling the stream graph again, and you are executing two Flink jobs: the > collect job and the execute job. > > I hope I got it right this time :) > > Best, > Robert > > On Fri, Feb 21, 2020 at 4:47 PM Niels Basjes <ni...@basjes.nl> wrote: > >> I tried this in Flink 1.10.0 : >> >> @Test >> public void experimentalTest() throws Exception { >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStream<String> input = env.fromElements("One", "Two"); >> // DataStream<String> input = env.addSource(new >> StringSourceFunction()); >> List<String> result = new ArrayList<>(5); >> DataStreamUtils.collect(input).forEachRemaining(result::add); >> env.execute("Flink Streaming Java API Skeleton"); >> } >> >> >> Results in >> >> >> java.lang.IllegalStateException: No operators defined in streaming topology. >> Cannot execute. >> >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) >> at >> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> ... >> >> >> >> On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> Hey Niels, >>> >>> This minimal Flink job executes in Flink 1.10: >>> >>> public static void main(String[] args) throws Exception { >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> DataStream<String> input = env.addSource(new StringSourceFunction()); >>> List<String> result = new ArrayList<>(5); >>> DataStreamUtils.collect(input).forEachRemaining(result::add); >>> env.execute("Flink Streaming Java API Skeleton"); >>> } >>> >>> Maybe the TestUserAgentAnalysisMapperInline class is doing some magic >>> that breaks with the StreamGraphGenerator? >>> >>> Best, >>> Robert >>> >>> On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <ni...@basjes.nl> wrote: >>> >>>> Hi Gordon, >>>> >>>> Thanks. This works for me. >>>> >>>> I find it strange that when I do this it works (I made the differences >>>> bold) >>>> >>>> List<TestRecord> result = new ArrayList<>(5); >>>> >>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >>>> >>>> *resultDataStream.print();* >>>> >>>> environment.execute(); >>>> >>>> >>>> how ever this does not work >>>> >>>> List<TestRecord> result = new ArrayList<>(5); >>>> >>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >>>> >>>> environment.execute(); >>>> >>>> >>>> and this also does not work >>>> >>>> *resultDataStream.print();* >>>> >>>> List<TestRecord> result = new ArrayList<>(5); >>>> >>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >>>> >>>> environment.execute(); >>>> >>>> >>>> In both these cases it fails with >>>> >>>> >>>> java.lang.IllegalStateException: *No operators defined in streaming >>>> topology. Cannot execute.* >>>> >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602) >>>> at >>>> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144) >>>> >>>> >>>> >>>> Did I do something wrong? >>>> Is this a bug in the DataStreamUtils ? >>>> >>>> Niels Basjes >>>> >>>> >>>> >>>> On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <tzuli...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> To collect the elements of a DataStream (usually only meant for testing >>>>> purposes), you can take a look at >>>>> `DataStreamUtils#collect(DataStream)`. >>>>> >>>>> Cheers, >>>>> Gordon >>>>> >>>>> >>>>> >>>>> -- >>>>> Sent from: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>> >>>> >>>> >>>> -- >>>> Best regards / Met vriendelijke groeten, >>>> >>>> Niels Basjes >>>> >>> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > -- Best regards / Met vriendelijke groeten, Niels Basjes