Promising observation, Till. Is it possible to access Table API's select and where operators from within such a flatMap?
-n On Fri, Nov 6, 2015 at 6:19 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Nick, > > I think a flatMap operation which is instantiated with your list of > predicates should do the job. Thus, there shouldn’t be a need to dig deeper > than the DataStream for the first version. > > Cheers, > Till > > > On Fri, Nov 6, 2015 at 3:58 AM, Nick Dimiduk <ndimi...@gmail.com> wrote: >> >> Thanks Stephan, I'll check that out in the morning. Generally speaking, it >> would be great to have some single-jvm example tests for those of us getting >> started. Following the example of WindowingIntegrationTest is mostly >> working, though reusing my single sink instance with its static collection >> results in non-deterministic results; there appears to be a race between >> instances clearing the collection in their open method and the runtime >> returning the collection to my test harness. >> >> I'd also appreciate some guidance on stream composition. It's nice to use >> the fluent API when exploring data in a shell, but it seems to me like that >> API is cumbersome when composing data pipelines of reusable partials. Or >> maybe I'm doing it all wrong... Hence the request for more examples :) >> >> While I'm asking, how might you model this: I have a set of predicates I'd >> like to flatMap over a stream. An input item should be compared vs every >> predicate (basically, I want a Clojure juxt of predicates over each stream >> element). Imagine those predicates expressed as where clauses via the Table >> API. Say I have hundreds of thousands of these predicates to run over every >> stream event. Is the java client API rich enough to express such a flow, or >> should I examine something lower than DataStream? >> >> Thanks a lot, and sorry for all the newb questions. >> -n >> >> >> On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org> wrote: >>> >>> Hey! >>> >>> There is also a collect() sink in the "flink-streaming-contrib" project, >>> see here: >>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java >>> >>> It should work well locally for testing. In that case you can write a >>> program as usual an use "DataStreamUtils.collect(stream)", so you need to >>> stop reading it once you know the stream is exhausted... >>> >>> Stephan >>> >>> >>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com> wrote: >>>> >>>> Hi Robert, >>>> >>>> It seems "type" was what I needed. This it also looks like the test >>>> jar has an undeclared dependency. In the end, the following allowed me >>>> to use TestStreamEnvironment for my integration test. Thanks a lot! >>>> >>>> -n >>>> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-streaming-core</artifactId> >>>> <version>${flink.version}</version> >>>> <type>test-jar</type> >>>> <scope>test</scope> >>>> </dependency> >>>> <dependency> >>>> <groupId>org.apache.flink</groupId> >>>> <artifactId>flink-test-utils</artifactId> >>>> <version>${flink.version}</version> >>>> <scope>test</scope> >>>> </dependency> >>>> >>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org> >>>> wrote: >>>> > Hi Nick, >>>> > >>>> > we are usually publishing the test artifacts. Can you try and replace >>>> > the >>>> > <classifier> tag by <type>test-jar<type>: >>>> > >>>> > <dependency> >>>> > <groupId>org.apache.flink</groupId> >>>> > <artifactId>flink-streaming-core</artifactId> >>>> > <version>${flink.version}</version> >>>> > <type>test-jar</type> >>>> > <scope>test</scope> >>>> > </dependency> >>>> > >>>> > >>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com> >>>> > wrote: >>>> >> >>>> >> Hello, >>>> >> >>>> >> I'm attempting integration tests for my streaming flows. I'd like to >>>> >> produce an input stream of java objects and sink the results into a >>>> >> collection for verification via JUnit asserts. >>>> >> StreamExecutionEnvironment provides methods for the former, however, >>>> >> how to achieve the latter is not evident based on my internet >>>> >> searching. I think I've found a solution in the TestStreamEnvironment >>>> >> class, ie, as used by WindowingIntegrationTest. However, this class >>>> >> appears to be packaged in the flink-streaming-core test artifact, >>>> >> which is not published to maven. >>>> >> >>>> >> For reference, this is the maven dependency stanza I'm using. Please >>>> >> let me know if I've got it wrong. >>>> >> >>>> >> Thanks, >>>> >> Nick >>>> >> >>>> >> <dependency> >>>> >> <groupId>org.apache.flink</groupId> >>>> >> <artifactId>flink-streaming-core</artifactId> >>>> >> <version>${flink.version}</version> >>>> >> <classifier>test</classifier> >>>> >> <scope>test</scope> >>>> >> </dependency> >>>> > >>>> > >>> >>> >