On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:
> Thanks Stephan, I'll check that out in the morning. Generally speaking, it > would be great to have some single-jvm example tests for those of us > getting started. Following the example of WindowingIntegrationTest is > mostly working, though reusing my single sink instance with its static > collection results in non-deterministic results; there appears to be a race > between instances clearing the collection in their open method and the > runtime returning the collection to my test harness. This inconsistent test result is pretty frustrating. I've created a sample project with an IT that demonstrates the issue. Run `mvn test` multiple times and see that sometimes it passes and sometimes it fails. Maybe someone has some thoughts? https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6 Thanks, Nick I'd also appreciate some guidance on stream composition. It's nice to use > the fluent API when exploring data in a shell, but it seems to me like that > API is cumbersome when composing data pipelines of reusable partials. Or > maybe I'm doing it all wrong... Hence the request for more examples :) > > While I'm asking, how might you model this: I have a set of predicates I'd > like to flatMap over a stream. An input item should be compared vs every > predicate (basically, I want a Clojure juxt of predicates over each stream > element). Imagine those predicates expressed as where clauses via the Table > API. Say I have hundreds of thousands of these predicates to run over every > stream event. Is the java client API rich enough to express such a flow, or > should I examine something lower than DataStream? > > Thanks a lot, and sorry for all the newb questions. > -n > > > On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org> wrote: > >> Hey! >> >> There is also a collect() sink in the "flink-streaming-contrib" project, >> see here: >> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java >> >> It should work well locally for testing. In that case you can write a >> program as usual an use "DataStreamUtils.collect(stream)", so you need to >> stop reading it once you know the stream is exhausted... >> >> Stephan >> >> >> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: >> >>> Hi Robert, >>> >>> It seems "type" was what I needed. This it also looks like the test >>> jar has an undeclared dependency. In the end, the following allowed me >>> to use TestStreamEnvironment for my integration test. Thanks a lot! >>> >>> -n >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-streaming-core</artifactId> >>> <version>${flink.version}</version> >>> <type>test-jar</type> >>> <scope>test</scope> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-test-utils</artifactId> >>> <version>${flink.version}</version> >>> <scope>test</scope> >>> </dependency> >>> >>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org> >>> wrote: >>> > Hi Nick, >>> > >>> > we are usually publishing the test artifacts. Can you try and replace >>> the >>> > <classifier> tag by <type>test-jar<type>: >>> > >>> > <dependency> >>> > <groupId>org.apache.flink</groupId> >>> > <artifactId>flink-streaming-core</artifactId> >>> > <version>${flink.version}</version> >>> > <type>test-jar</type> >>> > <scope>test</scope> >>> > </dependency> >>> > >>> > >>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com> >>> wrote: >>> >> >>> >> Hello, >>> >> >>> >> I'm attempting integration tests for my streaming flows. I'd like to >>> >> produce an input stream of java objects and sink the results into a >>> >> collection for verification via JUnit asserts. >>> >> StreamExecutionEnvironment provides methods for the former, however, >>> >> how to achieve the latter is not evident based on my internet >>> >> searching. I think I've found a solution in the TestStreamEnvironment >>> >> class, ie, as used by WindowingIntegrationTest. However, this class >>> >> appears to be packaged in the flink-streaming-core test artifact, >>> >> which is not published to maven. >>> >> >>> >> For reference, this is the maven dependency stanza I'm using. Please >>> >> let me know if I've got it wrong. >>> >> >>> >> Thanks, >>> >> Nick >>> >> >>> >> <dependency> >>> >> <groupId>org.apache.flink</groupId> >>> >> <artifactId>flink-streaming-core</artifactId> >>> >> <version>${flink.version}</version> >>> >> <classifier>test</classifier> >>> >> <scope>test</scope> >>> >> </dependency> >>> > >>> > >>> >> >>