Yes that's it!

My code
https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperInline.java#L144
now does this:

DataStream<TestRecord> resultDataStream = ...


List<TestRecord> result = new ArrayList<>(5);
DataStreamUtils
    .collect(resultDataStream)
    .forEachRemaining(result::add);

assertEquals(2, result.size());


And as you explained because the 'collect' already does an execute this
works like a charm.

Niels




On Sat, Feb 22, 2020 at 1:38 AM Robert Metzger <rmetz...@apache.org> wrote:

> Hey,
> you are right. I'm also seeing this exception now. It was hidden in other
> log output.
>
> The solution to all this confusion is simple: DataStreamUtils.collect() Is
> like an execute().
>
> The stream graph is cleared on each execute(). That's why collect() and
> then execute() lead to the "no operators defined" error.
> However, if you have collect(), print(), execute(), then the print() is
> filling the stream graph again, and you are executing two Flink jobs: the
> collect job and the execute job.
>
> I hope I got it right this time :)
>
> Best,
> Robert
>
> On Fri, Feb 21, 2020 at 4:47 PM Niels Basjes <ni...@basjes.nl> wrote:
>
>> I tried this in Flink 1.10.0 :
>>
>>     @Test
>>     public void experimentalTest() throws Exception {
>>         final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         DataStream<String> input = env.fromElements("One", "Two");
>> //        DataStream<String> input = env.addSource(new 
>> StringSourceFunction());
>>         List<String> result = new ArrayList<>(5);
>>         DataStreamUtils.collect(input).forEachRemaining(result::add);
>>         env.execute("Flink Streaming Java API Skeleton");
>>     }
>>
>>
>> Results in
>>
>>
>> java.lang.IllegalStateException: No operators defined in streaming topology. 
>> Cannot execute.
>>
>>      at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
>>      at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
>>      at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
>>      at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>      at 
>> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177)
>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>      at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> ...
>>
>>
>>
>> On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Hey Niels,
>>>
>>> This minimal Flink job executes in Flink 1.10:
>>>
>>> public static void main(String[] args) throws Exception {
>>>    final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>    DataStream<String> input = env.addSource(new StringSourceFunction());
>>>    List<String> result = new ArrayList<>(5);
>>>    DataStreamUtils.collect(input).forEachRemaining(result::add);
>>>    env.execute("Flink Streaming Java API Skeleton");
>>> }
>>>
>>> Maybe the TestUserAgentAnalysisMapperInline class is doing some magic
>>> that breaks with the StreamGraphGenerator?
>>>
>>> Best,
>>> Robert
>>>
>>> On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <ni...@basjes.nl> wrote:
>>>
>>>> Hi Gordon,
>>>>
>>>> Thanks. This works for me.
>>>>
>>>> I find it strange that when I do this it works (I made the differences
>>>> bold)
>>>>
>>>> List<TestRecord> result = new ArrayList<>(5);
>>>>
>>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>>>
>>>> *resultDataStream.print();*
>>>>
>>>> environment.execute();
>>>>
>>>>
>>>> how ever this does not work
>>>>
>>>> List<TestRecord> result = new ArrayList<>(5);
>>>>
>>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>>>
>>>> environment.execute();
>>>>
>>>>
>>>> and this also does not work
>>>>
>>>> *resultDataStream.print();*
>>>>
>>>> List<TestRecord> result = new ArrayList<>(5);
>>>>
>>>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add);
>>>>
>>>> environment.execute();
>>>>
>>>>
>>>> In both these cases it fails with
>>>>
>>>>
>>>> java.lang.IllegalStateException: *No operators defined in streaming
>>>> topology. Cannot execute.*
>>>>
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
>>>> at
>>>> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144)
>>>>
>>>>
>>>>
>>>> Did I do something wrong?
>>>> Is this a bug in the DataStreamUtils ?
>>>>
>>>> Niels Basjes
>>>>
>>>>
>>>>
>>>> On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <tzuli...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> To collect the elements of a DataStream (usually only meant for testing
>>>>> purposes), you can take a look at
>>>>> `DataStreamUtils#collect(DataStream)`.
>>>>>
>>>>> Cheers,
>>>>> Gordon
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Sent from:
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to