Thanks Stephan, I'll check that out in the morning. Generally speaking, it
would be great to have some single-jvm example tests for those of us
getting started. Following the example of WindowingIntegrationTest is
mostly working, though reusing my single sink instance with its static
collection results in non-deterministic results; there appears to be a race
between instances clearing the collection in their open method and the
runtime returning the collection to my test harness.

I'd also appreciate some guidance on stream composition. It's nice to use
the fluent API when exploring data in a shell, but it seems to me like that
API is cumbersome when composing data pipelines of reusable partials. Or
maybe I'm doing it all wrong... Hence the request for more examples :)

While I'm asking, how might you model this: I have a set of predicates I'd
like to flatMap over a stream. An input item should be compared vs every
predicate (basically, I want a Clojure juxt of predicates over each stream
element). Imagine those predicates expressed as where clauses via the Table
API. Say I have hundreds of thousands of these predicates to run over every
stream event. Is the java client API rich enough to express such a flow, or
should I examine something lower than DataStream?

Thanks a lot, and sorry for all the newb questions.
-n

On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org> wrote:

> Hey!
>
> There is also a collect() sink in the "flink-streaming-contrib" project,
> see here:
> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
>
> It should work well locally for testing. In that case you can write a
> program as usual an use "DataStreamUtils.collect(stream)", so you need to
> stop reading it once you know the stream is exhausted...
>
> Stephan
>
>
> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com
> <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote:
>
>> Hi Robert,
>>
>> It seems "type" was what I needed. This it also looks like the test
>> jar has an undeclared dependency. In the end, the following allowed me
>> to use TestStreamEnvironment for my integration test. Thanks a lot!
>>
>> -n
>>
>>     <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-core</artifactId>
>>       <version>${flink.version}</version>
>>       <type>test-jar</type>
>>       <scope>test</scope>
>>     </dependency>
>>     <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-test-utils</artifactId>
>>       <version>${flink.version}</version>
>>       <scope>test</scope>
>>     </dependency>
>>
>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org
>> <javascript:_e(%7B%7D,'cvml','rmetz...@apache.org');>> wrote:
>> > Hi Nick,
>> >
>> > we are usually publishing the test  artifacts. Can you try and replace
>> the
>> > <classifier> tag by <type>test-jar<type>:
>> >
>> > <dependency>
>> >    <groupId>org.apache.flink</groupId>
>> >    <artifactId>flink-streaming-core</artifactId>
>> >    <version>${flink.version}</version>
>> >    <type>test-jar</type>
>> >    <scope>test</scope>
>> > </dependency>
>> >
>> >
>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote:
>> >>
>> >> Hello,
>> >>
>> >> I'm attempting integration tests for my streaming flows. I'd like to
>> >> produce an input stream of java objects and sink the results into a
>> >> collection for verification via JUnit asserts.
>> >> StreamExecutionEnvironment provides methods for the former, however,
>> >> how to achieve the latter is not evident based on my internet
>> >> searching. I think I've found a solution in the TestStreamEnvironment
>> >> class, ie, as used by WindowingIntegrationTest. However, this class
>> >> appears to be packaged in the flink-streaming-core test artifact,
>> >> which is not published to maven.
>> >>
>> >> For reference, this is the maven dependency stanza I'm using. Please
>> >> let me know if I've got it wrong.
>> >>
>> >> Thanks,
>> >> Nick
>> >>
>> >>     <dependency>
>> >>       <groupId>org.apache.flink</groupId>
>> >>       <artifactId>flink-streaming-core</artifactId>
>> >>       <version>${flink.version}</version>
>> >>       <classifier>test</classifier>
>> >>       <scope>test</scope>
>> >>     </dependency>
>> >
>> >
>>
>
>

Reply via email to