Please see the above gist: my test makes no assertions until after the env.execute() call. Adding setParallelism(1) to my sink appears to stabilize my test. Indeed, very helpful. Thanks a lot!
-n On Wed, Nov 18, 2015 at 9:15 AM, Stephan Ewen <se...@apache.org> wrote: > Okay, I think I misunderstood your problem. > > Usually you can simply execute tests one after another by waiting until > "env.execute()" returns. The streaming jobs terminate by themselves once > the sources reach end of stream (finite streams are supported that way) but > make sure all records flow through the entire stream (no barrier needed). > So if you use a source that reaches an end, no extra work is needed. > > If you have a proper infinite source (like Kafka), things are a bit more > tricky, since you have a proper infinite streaming program. What we do in > our Kafka Tests is throw a "SuccessException" in the sink once we saw all > data we expected. You can get the cause exceptions in a try/catch around > env.execute() to check if the program "failed" with a SuccessException, or > whether it failed proper. > > A "sink in DOP 1" (sorry for the unclear terminology) is a sink with > parallelism 1, so all data is collected by the same function instance. > > Any of this helpful? > > Stephan > > > On Wed, Nov 18, 2015 at 5:13 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: > >> Sorry Stephan but I don't follow how global order applies in my case. I'm >> merely checking the size of the sink results. I assume all tuples from a >> given test invitation have sunk before the next test begins, which is >> clearly not the case. Is there a way I can place a barrier in my tests to >> ensure one streaming DAG runs at a time, and that all buffers have been >> flushed to the sink before the next test begins? >> >> What is "sink in DOP 1"? >> >> Thanks, >> Nick >> >> >> On Wednesday, November 18, 2015, Stephan Ewen <se...@apache.org> wrote: >> >>> There is no global order in parallel streams, it is something that >>> applications need to work with. We are thinking about adding operations to >>> introduce event-time order (at the cost of some delay), but that is only >>> plans at this point. >>> >>> >>> What I do in my tests is run the test streams in parallel, but the Sink >>> in DOP 1. The sink gathers the elements in a list, and the close() function >>> validates the result. >>> >>> Validating the results may involve sorting the list where elements where >>> gathered (make the order deterministic) or use a hash set if it is only >>> about distinct count. >>> >>> Hope that helps. >>> >>> On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk <ndimi...@gmail.com> >>> wrote: >>> >>>> On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <ndimi...@gmail.com> >>>> wrote: >>>> >>>>> Thanks Stephan, I'll check that out in the morning. Generally >>>>> speaking, it would be great to have some single-jvm example tests for >>>>> those >>>>> of us getting started. Following the example of WindowingIntegrationTest >>>>> is >>>>> mostly working, though reusing my single sink instance with its static >>>>> collection results in non-deterministic results; there appears to be a >>>>> race >>>>> between instances clearing the collection in their open method and the >>>>> runtime returning the collection to my test harness. >>>> >>>> >>>> This inconsistent test result is pretty frustrating. I've created a >>>> sample project with an IT that demonstrates the issue. Run `mvn test` >>>> multiple times and see that sometimes it passes and sometimes it fails. >>>> Maybe someone has some thoughts? >>>> >>>> https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6 >>>> >>>> Thanks, >>>> Nick >>>> >>>> I'd also appreciate some guidance on stream composition. It's nice to >>>>> use the fluent API when exploring data in a shell, but it seems to me like >>>>> that API is cumbersome when composing data pipelines of reusable partials. >>>>> Or maybe I'm doing it all wrong... Hence the request for more examples :) >>>>> >>>>> While I'm asking, how might you model this: I have a set of predicates >>>>> I'd like to flatMap over a stream. An input item should be compared vs >>>>> every predicate (basically, I want a Clojure juxt of predicates over each >>>>> stream element). Imagine those predicates expressed as where clauses via >>>>> the Table API. Say I have hundreds of thousands of these predicates to run >>>>> over every stream event. Is the java client API rich enough to express >>>>> such >>>>> a flow, or should I examine something lower than DataStream? >>>>> >>>>> Thanks a lot, and sorry for all the newb questions. >>>>> -n >>>>> >>>>> >>>>> On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org> wrote: >>>>> >>>>>> Hey! >>>>>> >>>>>> There is also a collect() sink in the "flink-streaming-contrib" >>>>>> project, see here: >>>>>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java >>>>>> >>>>>> It should work well locally for testing. In that case you can write a >>>>>> program as usual an use "DataStreamUtils.collect(stream)", so you need to >>>>>> stop reading it once you know the stream is exhausted... >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Robert, >>>>>>> >>>>>>> It seems "type" was what I needed. This it also looks like the test >>>>>>> jar has an undeclared dependency. In the end, the following allowed >>>>>>> me >>>>>>> to use TestStreamEnvironment for my integration test. Thanks a lot! >>>>>>> >>>>>>> -n >>>>>>> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> <artifactId>flink-streaming-core</artifactId> >>>>>>> <version>${flink.version}</version> >>>>>>> <type>test-jar</type> >>>>>>> <scope>test</scope> >>>>>>> </dependency> >>>>>>> <dependency> >>>>>>> <groupId>org.apache.flink</groupId> >>>>>>> <artifactId>flink-test-utils</artifactId> >>>>>>> <version>${flink.version}</version> >>>>>>> <scope>test</scope> >>>>>>> </dependency> >>>>>>> >>>>>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org> >>>>>>> wrote: >>>>>>> > Hi Nick, >>>>>>> > >>>>>>> > we are usually publishing the test artifacts. Can you try and >>>>>>> replace the >>>>>>> > <classifier> tag by <type>test-jar<type>: >>>>>>> > >>>>>>> > <dependency> >>>>>>> > <groupId>org.apache.flink</groupId> >>>>>>> > <artifactId>flink-streaming-core</artifactId> >>>>>>> > <version>${flink.version}</version> >>>>>>> > <type>test-jar</type> >>>>>>> > <scope>test</scope> >>>>>>> > </dependency> >>>>>>> > >>>>>>> > >>>>>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com> >>>>>>> wrote: >>>>>>> >> >>>>>>> >> Hello, >>>>>>> >> >>>>>>> >> I'm attempting integration tests for my streaming flows. I'd like >>>>>>> to >>>>>>> >> produce an input stream of java objects and sink the results into >>>>>>> a >>>>>>> >> collection for verification via JUnit asserts. >>>>>>> >> StreamExecutionEnvironment provides methods for the former, >>>>>>> however, >>>>>>> >> how to achieve the latter is not evident based on my internet >>>>>>> >> searching. I think I've found a solution in the >>>>>>> TestStreamEnvironment >>>>>>> >> class, ie, as used by WindowingIntegrationTest. However, this >>>>>>> class >>>>>>> >> appears to be packaged in the flink-streaming-core test artifact, >>>>>>> >> which is not published to maven. >>>>>>> >> >>>>>>> >> For reference, this is the maven dependency stanza I'm using. >>>>>>> Please >>>>>>> >> let me know if I've got it wrong. >>>>>>> >> >>>>>>> >> Thanks, >>>>>>> >> Nick >>>>>>> >> >>>>>>> >> <dependency> >>>>>>> >> <groupId>org.apache.flink</groupId> >>>>>>> >> <artifactId>flink-streaming-core</artifactId> >>>>>>> >> <version>${flink.version}</version> >>>>>>> >> <classifier>test</classifier> >>>>>>> >> <scope>test</scope> >>>>>>> >> </dependency> >>>>>>> > >>>>>>> > >>>>>>> >>>>>> >>>>>> >>>> >>> >