Okay, I think I misunderstood your problem. Usually you can simply execute tests one after another by waiting until "env.execute()" returns. The streaming jobs terminate by themselves once the sources reach end of stream (finite streams are supported that way) but make sure all records flow through the entire stream (no barrier needed). So if you use a source that reaches an end, no extra work is needed.
If you have a proper infinite source (like Kafka), things are a bit more tricky, since you have a proper infinite streaming program. What we do in our Kafka Tests is throw a "SuccessException" in the sink once we saw all data we expected. You can get the cause exceptions in a try/catch around env.execute() to check if the program "failed" with a SuccessException, or whether it failed proper. A "sink in DOP 1" (sorry for the unclear terminology) is a sink with parallelism 1, so all data is collected by the same function instance. Any of this helpful? Stephan On Wed, Nov 18, 2015 at 5:13 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: > Sorry Stephan but I don't follow how global order applies in my case. I'm > merely checking the size of the sink results. I assume all tuples from a > given test invitation have sunk before the next test begins, which is > clearly not the case. Is there a way I can place a barrier in my tests to > ensure one streaming DAG runs at a time, and that all buffers have been > flushed to the sink before the next test begins? > > What is "sink in DOP 1"? > > Thanks, > Nick > > > On Wednesday, November 18, 2015, Stephan Ewen <se...@apache.org> wrote: > >> There is no global order in parallel streams, it is something that >> applications need to work with. We are thinking about adding operations to >> introduce event-time order (at the cost of some delay), but that is only >> plans at this point. >> >> >> What I do in my tests is run the test streams in parallel, but the Sink >> in DOP 1. The sink gathers the elements in a list, and the close() function >> validates the result. >> >> Validating the results may involve sorting the list where elements where >> gathered (make the order deterministic) or use a hash set if it is only >> about distinct count. >> >> Hope that helps. >> >> On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: >> >>> On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: >>> >>>> Thanks Stephan, I'll check that out in the morning. Generally speaking, >>>> it would be great to have some single-jvm example tests for those of us >>>> getting started. Following the example of WindowingIntegrationTest is >>>> mostly working, though reusing my single sink instance with its static >>>> collection results in non-deterministic results; there appears to be a race >>>> between instances clearing the collection in their open method and the >>>> runtime returning the collection to my test harness. >>> >>> >>> This inconsistent test result is pretty frustrating. I've created a >>> sample project with an IT that demonstrates the issue. Run `mvn test` >>> multiple times and see that sometimes it passes and sometimes it fails. >>> Maybe someone has some thoughts? >>> >>> https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6 >>> >>> Thanks, >>> Nick >>> >>> I'd also appreciate some guidance on stream composition. It's nice to >>>> use the fluent API when exploring data in a shell, but it seems to me like >>>> that API is cumbersome when composing data pipelines of reusable partials. >>>> Or maybe I'm doing it all wrong... Hence the request for more examples :) >>>> >>>> While I'm asking, how might you model this: I have a set of predicates >>>> I'd like to flatMap over a stream. An input item should be compared vs >>>> every predicate (basically, I want a Clojure juxt of predicates over each >>>> stream element). Imagine those predicates expressed as where clauses via >>>> the Table API. Say I have hundreds of thousands of these predicates to run >>>> over every stream event. Is the java client API rich enough to express such >>>> a flow, or should I examine something lower than DataStream? >>>> >>>> Thanks a lot, and sorry for all the newb questions. >>>> -n >>>> >>>> >>>> On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org> wrote: >>>> >>>>> Hey! >>>>> >>>>> There is also a collect() sink in the "flink-streaming-contrib" >>>>> project, see here: >>>>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java >>>>> >>>>> It should work well locally for testing. In that case you can write a >>>>> program as usual an use "DataStreamUtils.collect(stream)", so you need to >>>>> stop reading it once you know the stream is exhausted... >>>>> >>>>> Stephan >>>>> >>>>> >>>>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Robert, >>>>>> >>>>>> It seems "type" was what I needed. This it also looks like the test >>>>>> jar has an undeclared dependency. In the end, the following allowed me >>>>>> to use TestStreamEnvironment for my integration test. Thanks a lot! >>>>>> >>>>>> -n >>>>>> >>>>>> <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> <artifactId>flink-streaming-core</artifactId> >>>>>> <version>${flink.version}</version> >>>>>> <type>test-jar</type> >>>>>> <scope>test</scope> >>>>>> </dependency> >>>>>> <dependency> >>>>>> <groupId>org.apache.flink</groupId> >>>>>> <artifactId>flink-test-utils</artifactId> >>>>>> <version>${flink.version}</version> >>>>>> <scope>test</scope> >>>>>> </dependency> >>>>>> >>>>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org> >>>>>> wrote: >>>>>> > Hi Nick, >>>>>> > >>>>>> > we are usually publishing the test artifacts. Can you try and >>>>>> replace the >>>>>> > <classifier> tag by <type>test-jar<type>: >>>>>> > >>>>>> > <dependency> >>>>>> > <groupId>org.apache.flink</groupId> >>>>>> > <artifactId>flink-streaming-core</artifactId> >>>>>> > <version>${flink.version}</version> >>>>>> > <type>test-jar</type> >>>>>> > <scope>test</scope> >>>>>> > </dependency> >>>>>> > >>>>>> > >>>>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com> >>>>>> wrote: >>>>>> >> >>>>>> >> Hello, >>>>>> >> >>>>>> >> I'm attempting integration tests for my streaming flows. I'd like >>>>>> to >>>>>> >> produce an input stream of java objects and sink the results into a >>>>>> >> collection for verification via JUnit asserts. >>>>>> >> StreamExecutionEnvironment provides methods for the former, >>>>>> however, >>>>>> >> how to achieve the latter is not evident based on my internet >>>>>> >> searching. I think I've found a solution in the >>>>>> TestStreamEnvironment >>>>>> >> class, ie, as used by WindowingIntegrationTest. However, this class >>>>>> >> appears to be packaged in the flink-streaming-core test artifact, >>>>>> >> which is not published to maven. >>>>>> >> >>>>>> >> For reference, this is the maven dependency stanza I'm using. >>>>>> Please >>>>>> >> let me know if I've got it wrong. >>>>>> >> >>>>>> >> Thanks, >>>>>> >> Nick >>>>>> >> >>>>>> >> <dependency> >>>>>> >> <groupId>org.apache.flink</groupId> >>>>>> >> <artifactId>flink-streaming-core</artifactId> >>>>>> >> <version>${flink.version}</version> >>>>>> >> <classifier>test</classifier> >>>>>> >> <scope>test</scope> >>>>>> >> </dependency> >>>>>> > >>>>>> > >>>>>> >>>>> >>>>> >>> >>