Thanks Stephan, I'll check that out in the morning. Generally speaking, it would be great to have some single-jvm example tests for those of us getting started. Following the example of WindowingIntegrationTest is mostly working, though reusing my single sink instance with its static collection results in non-deterministic results; there appears to be a race between instances clearing the collection in their open method and the runtime returning the collection to my test harness.
I'd also appreciate some guidance on stream composition. It's nice to use the fluent API when exploring data in a shell, but it seems to me like that API is cumbersome when composing data pipelines of reusable partials. Or maybe I'm doing it all wrong... Hence the request for more examples :) While I'm asking, how might you model this: I have a set of predicates I'd like to flatMap over a stream. An input item should be compared vs every predicate (basically, I want a Clojure juxt of predicates over each stream element). Imagine those predicates expressed as where clauses via the Table API. Say I have hundreds of thousands of these predicates to run over every stream event. Is the java client API rich enough to express such a flow, or should I examine something lower than DataStream? Thanks a lot, and sorry for all the newb questions. -n On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org> wrote: > Hey! > > There is also a collect() sink in the "flink-streaming-contrib" project, > see here: > https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java > > It should work well locally for testing. In that case you can write a > program as usual an use "DataStreamUtils.collect(stream)", so you need to > stop reading it once you know the stream is exhausted... > > Stephan > > > On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com > <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote: > >> Hi Robert, >> >> It seems "type" was what I needed. This it also looks like the test >> jar has an undeclared dependency. In the end, the following allowed me >> to use TestStreamEnvironment for my integration test. Thanks a lot! >> >> -n >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-core</artifactId> >> <version>${flink.version}</version> >> <type>test-jar</type> >> <scope>test</scope> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-test-utils</artifactId> >> <version>${flink.version}</version> >> <scope>test</scope> >> </dependency> >> >> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org >> <javascript:_e(%7B%7D,'cvml','rmetz...@apache.org');>> wrote: >> > Hi Nick, >> > >> > we are usually publishing the test artifacts. Can you try and replace >> the >> > <classifier> tag by <type>test-jar<type>: >> > >> > <dependency> >> > <groupId>org.apache.flink</groupId> >> > <artifactId>flink-streaming-core</artifactId> >> > <version>${flink.version}</version> >> > <type>test-jar</type> >> > <scope>test</scope> >> > </dependency> >> > >> > >> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com >> <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote: >> >> >> >> Hello, >> >> >> >> I'm attempting integration tests for my streaming flows. I'd like to >> >> produce an input stream of java objects and sink the results into a >> >> collection for verification via JUnit asserts. >> >> StreamExecutionEnvironment provides methods for the former, however, >> >> how to achieve the latter is not evident based on my internet >> >> searching. I think I've found a solution in the TestStreamEnvironment >> >> class, ie, as used by WindowingIntegrationTest. However, this class >> >> appears to be packaged in the flink-streaming-core test artifact, >> >> which is not published to maven. >> >> >> >> For reference, this is the maven dependency stanza I'm using. Please >> >> let me know if I've got it wrong. >> >> >> >> Thanks, >> >> Nick >> >> >> >> <dependency> >> >> <groupId>org.apache.flink</groupId> >> >> <artifactId>flink-streaming-core</artifactId> >> >> <version>${flink.version}</version> >> >> <classifier>test</classifier> >> >> <scope>test</scope> >> >> </dependency> >> > >> > >> > >