There is no global order in parallel streams, it is something that applications need to work with. We are thinking about adding operations to introduce event-time order (at the cost of some delay), but that is only plans at this point.
What I do in my tests is run the test streams in parallel, but the Sink in DOP 1. The sink gathers the elements in a list, and the close() function validates the result. Validating the results may involve sorting the list where elements where gathered (make the order deterministic) or use a hash set if it is only about distinct count. Hope that helps. On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: > On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: > >> Thanks Stephan, I'll check that out in the morning. Generally speaking, >> it would be great to have some single-jvm example tests for those of us >> getting started. Following the example of WindowingIntegrationTest is >> mostly working, though reusing my single sink instance with its static >> collection results in non-deterministic results; there appears to be a race >> between instances clearing the collection in their open method and the >> runtime returning the collection to my test harness. > > > This inconsistent test result is pretty frustrating. I've created a sample > project with an IT that demonstrates the issue. Run `mvn test` multiple > times and see that sometimes it passes and sometimes it fails. Maybe > someone has some thoughts? > > https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6 > > Thanks, > Nick > > I'd also appreciate some guidance on stream composition. It's nice to use >> the fluent API when exploring data in a shell, but it seems to me like that >> API is cumbersome when composing data pipelines of reusable partials. Or >> maybe I'm doing it all wrong... Hence the request for more examples :) >> >> While I'm asking, how might you model this: I have a set of predicates >> I'd like to flatMap over a stream. An input item should be compared vs >> every predicate (basically, I want a Clojure juxt of predicates over each >> stream element). Imagine those predicates expressed as where clauses via >> the Table API. Say I have hundreds of thousands of these predicates to run >> over every stream event. Is the java client API rich enough to express such >> a flow, or should I examine something lower than DataStream? >> >> Thanks a lot, and sorry for all the newb questions. >> -n >> >> >> On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org> wrote: >> >>> Hey! >>> >>> There is also a collect() sink in the "flink-streaming-contrib" project, >>> see here: >>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java >>> >>> It should work well locally for testing. In that case you can write a >>> program as usual an use "DataStreamUtils.collect(stream)", so you need to >>> stop reading it once you know the stream is exhausted... >>> >>> Stephan >>> >>> >>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com> >>> wrote: >>> >>>> Hi Robert, >>>> >>>> It seems "type" was what I needed. This it also looks like the test >>>> jar has an undeclared dependency. In the end, the following allowed me >>>> to use TestStreamEnvironment for my integration test. Thanks a lot! >>>> >>>> -n >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-streaming-core</artifactId> >>>> <version>${flink.version}</version> >>>> <type>test-jar</type> >>>> <scope>test</scope> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-test-utils</artifactId> >>>> <version>${flink.version}</version> >>>> <scope>test</scope> >>>> </dependency> >>>> >>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org> >>>> wrote: >>>> > Hi Nick, >>>> > >>>> > we are usually publishing the test artifacts. Can you try and >>>> replace the >>>> > <classifier> tag by <type>test-jar<type>: >>>> > >>>> > <dependency> >>>> > <groupId>org.apache.flink</groupId> >>>> > <artifactId>flink-streaming-core</artifactId> >>>> > <version>${flink.version}</version> >>>> > <type>test-jar</type> >>>> > <scope>test</scope> >>>> > </dependency> >>>> > >>>> > >>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com> >>>> wrote: >>>> >> >>>> >> Hello, >>>> >> >>>> >> I'm attempting integration tests for my streaming flows. I'd like to >>>> >> produce an input stream of java objects and sink the results into a >>>> >> collection for verification via JUnit asserts. >>>> >> StreamExecutionEnvironment provides methods for the former, however, >>>> >> how to achieve the latter is not evident based on my internet >>>> >> searching. I think I've found a solution in the TestStreamEnvironment >>>> >> class, ie, as used by WindowingIntegrationTest. However, this class >>>> >> appears to be packaged in the flink-streaming-core test artifact, >>>> >> which is not published to maven. >>>> >> >>>> >> For reference, this is the maven dependency stanza I'm using. Please >>>> >> let me know if I've got it wrong. >>>> >> >>>> >> Thanks, >>>> >> Nick >>>> >> >>>> >> <dependency> >>>> >> <groupId>org.apache.flink</groupId> >>>> >> <artifactId>flink-streaming-core</artifactId> >>>> >> <version>${flink.version}</version> >>>> >> <classifier>test</classifier> >>>> >> <scope>test</scope> >>>> >> </dependency> >>>> > >>>> > >>>> >>> >>> >