Sorry Stephan but I don't follow how global order applies in my case. I'm merely checking the size of the sink results. I assume all tuples from a given test invitation have sunk before the next test begins, which is clearly not the case. Is there a way I can place a barrier in my tests to ensure one streaming DAG runs at a time, and that all buffers have been flushed to the sink before the next test begins?
What is "sink in DOP 1"? Thanks, Nick On Wednesday, November 18, 2015, Stephan Ewen <se...@apache.org> wrote: > There is no global order in parallel streams, it is something that > applications need to work with. We are thinking about adding operations to > introduce event-time order (at the cost of some delay), but that is only > plans at this point. > > > What I do in my tests is run the test streams in parallel, but the Sink in > DOP 1. The sink gathers the elements in a list, and the close() function > validates the result. > > Validating the results may involve sorting the list where elements where > gathered (make the order deterministic) or use a hash set if it is only > about distinct count. > > Hope that helps. > > On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk <ndimi...@gmail.com > <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote: > >> On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <ndimi...@gmail.com >> <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote: >> >>> Thanks Stephan, I'll check that out in the morning. Generally speaking, >>> it would be great to have some single-jvm example tests for those of us >>> getting started. Following the example of WindowingIntegrationTest is >>> mostly working, though reusing my single sink instance with its static >>> collection results in non-deterministic results; there appears to be a race >>> between instances clearing the collection in their open method and the >>> runtime returning the collection to my test harness. >> >> >> This inconsistent test result is pretty frustrating. I've created a >> sample project with an IT that demonstrates the issue. Run `mvn test` >> multiple times and see that sometimes it passes and sometimes it fails. >> Maybe someone has some thoughts? >> >> https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6 >> >> Thanks, >> Nick >> >> I'd also appreciate some guidance on stream composition. It's nice to use >>> the fluent API when exploring data in a shell, but it seems to me like that >>> API is cumbersome when composing data pipelines of reusable partials. Or >>> maybe I'm doing it all wrong... Hence the request for more examples :) >>> >>> While I'm asking, how might you model this: I have a set of predicates >>> I'd like to flatMap over a stream. An input item should be compared vs >>> every predicate (basically, I want a Clojure juxt of predicates over each >>> stream element). Imagine those predicates expressed as where clauses via >>> the Table API. Say I have hundreds of thousands of these predicates to run >>> over every stream event. Is the java client API rich enough to express such >>> a flow, or should I examine something lower than DataStream? >>> >>> Thanks a lot, and sorry for all the newb questions. >>> -n >>> >>> >>> On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org >>> <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote: >>> >>>> Hey! >>>> >>>> There is also a collect() sink in the "flink-streaming-contrib" >>>> project, see here: >>>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java >>>> >>>> It should work well locally for testing. In that case you can write a >>>> program as usual an use "DataStreamUtils.collect(stream)", so you need to >>>> stop reading it once you know the stream is exhausted... >>>> >>>> Stephan >>>> >>>> >>>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com> >>>> wrote: >>>> >>>>> Hi Robert, >>>>> >>>>> It seems "type" was what I needed. This it also looks like the test >>>>> jar has an undeclared dependency. In the end, the following allowed me >>>>> to use TestStreamEnvironment for my integration test. Thanks a lot! >>>>> >>>>> -n >>>>> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-streaming-core</artifactId> >>>>> <version>${flink.version}</version> >>>>> <type>test-jar</type> >>>>> <scope>test</scope> >>>>> </dependency> >>>>> <dependency> >>>>> <groupId>org.apache.flink</groupId> >>>>> <artifactId>flink-test-utils</artifactId> >>>>> <version>${flink.version}</version> >>>>> <scope>test</scope> >>>>> </dependency> >>>>> >>>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org> >>>>> wrote: >>>>> > Hi Nick, >>>>> > >>>>> > we are usually publishing the test artifacts. Can you try and >>>>> replace the >>>>> > <classifier> tag by <type>test-jar<type>: >>>>> > >>>>> > <dependency> >>>>> > <groupId>org.apache.flink</groupId> >>>>> > <artifactId>flink-streaming-core</artifactId> >>>>> > <version>${flink.version}</version> >>>>> > <type>test-jar</type> >>>>> > <scope>test</scope> >>>>> > </dependency> >>>>> > >>>>> > >>>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com> >>>>> wrote: >>>>> >> >>>>> >> Hello, >>>>> >> >>>>> >> I'm attempting integration tests for my streaming flows. I'd like to >>>>> >> produce an input stream of java objects and sink the results into a >>>>> >> collection for verification via JUnit asserts. >>>>> >> StreamExecutionEnvironment provides methods for the former, however, >>>>> >> how to achieve the latter is not evident based on my internet >>>>> >> searching. I think I've found a solution in the >>>>> TestStreamEnvironment >>>>> >> class, ie, as used by WindowingIntegrationTest. However, this class >>>>> >> appears to be packaged in the flink-streaming-core test artifact, >>>>> >> which is not published to maven. >>>>> >> >>>>> >> For reference, this is the maven dependency stanza I'm using. Please >>>>> >> let me know if I've got it wrong. >>>>> >> >>>>> >> Thanks, >>>>> >> Nick >>>>> >> >>>>> >> <dependency> >>>>> >> <groupId>org.apache.flink</groupId> >>>>> >> <artifactId>flink-streaming-core</artifactId> >>>>> >> <version>${flink.version}</version> >>>>> >> <classifier>test</classifier> >>>>> >> <scope>test</scope> >>>>> >> </dependency> >>>>> > >>>>> > >>>>> >>>> >>>> >> >