On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:

> Thanks Stephan, I'll check that out in the morning. Generally speaking, it
> would be great to have some single-jvm example tests for those of us
> getting started. Following the example of WindowingIntegrationTest is
> mostly working, though reusing my single sink instance with its static
> collection results in non-deterministic results; there appears to be a race
> between instances clearing the collection in their open method and the
> runtime returning the collection to my test harness.


This inconsistent test result is pretty frustrating. I've created a sample
project with an IT that demonstrates the issue. Run `mvn test` multiple
times and see that sometimes it passes and sometimes it fails. Maybe
someone has some thoughts?

https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6

Thanks,
Nick

I'd also appreciate some guidance on stream composition. It's nice to use
> the fluent API when exploring data in a shell, but it seems to me like that
> API is cumbersome when composing data pipelines of reusable partials. Or
> maybe I'm doing it all wrong... Hence the request for more examples :)
>
> While I'm asking, how might you model this: I have a set of predicates I'd
> like to flatMap over a stream. An input item should be compared vs every
> predicate (basically, I want a Clojure juxt of predicates over each stream
> element). Imagine those predicates expressed as where clauses via the Table
> API. Say I have hundreds of thousands of these predicates to run over every
> stream event. Is the java client API rich enough to express such a flow, or
> should I examine something lower than DataStream?
>
> Thanks a lot, and sorry for all the newb questions.
> -n
>
>
> On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org> wrote:
>
>> Hey!
>>
>> There is also a collect() sink in the "flink-streaming-contrib" project,
>> see here:
>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
>>
>> It should work well locally for testing. In that case you can write a
>> program as usual an use "DataStreamUtils.collect(stream)", so you need to
>> stop reading it once you know the stream is exhausted...
>>
>> Stephan
>>
>>
>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:
>>
>>> Hi Robert,
>>>
>>> It seems "type" was what I needed. This it also looks like the test
>>> jar has an undeclared dependency. In the end, the following allowed me
>>> to use TestStreamEnvironment for my integration test. Thanks a lot!
>>>
>>> -n
>>>
>>>     <dependency>
>>>       <groupId>org.apache.flink</groupId>
>>>       <artifactId>flink-streaming-core</artifactId>
>>>       <version>${flink.version}</version>
>>>       <type>test-jar</type>
>>>       <scope>test</scope>
>>>     </dependency>
>>>     <dependency>
>>>       <groupId>org.apache.flink</groupId>
>>>       <artifactId>flink-test-utils</artifactId>
>>>       <version>${flink.version}</version>
>>>       <scope>test</scope>
>>>     </dependency>
>>>
>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org>
>>> wrote:
>>> > Hi Nick,
>>> >
>>> > we are usually publishing the test  artifacts. Can you try and replace
>>> the
>>> > <classifier> tag by <type>test-jar<type>:
>>> >
>>> > <dependency>
>>> >    <groupId>org.apache.flink</groupId>
>>> >    <artifactId>flink-streaming-core</artifactId>
>>> >    <version>${flink.version}</version>
>>> >    <type>test-jar</type>
>>> >    <scope>test</scope>
>>> > </dependency>
>>> >
>>> >
>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com>
>>> wrote:
>>> >>
>>> >> Hello,
>>> >>
>>> >> I'm attempting integration tests for my streaming flows. I'd like to
>>> >> produce an input stream of java objects and sink the results into a
>>> >> collection for verification via JUnit asserts.
>>> >> StreamExecutionEnvironment provides methods for the former, however,
>>> >> how to achieve the latter is not evident based on my internet
>>> >> searching. I think I've found a solution in the TestStreamEnvironment
>>> >> class, ie, as used by WindowingIntegrationTest. However, this class
>>> >> appears to be packaged in the flink-streaming-core test artifact,
>>> >> which is not published to maven.
>>> >>
>>> >> For reference, this is the maven dependency stanza I'm using. Please
>>> >> let me know if I've got it wrong.
>>> >>
>>> >> Thanks,
>>> >> Nick
>>> >>
>>> >>     <dependency>
>>> >>       <groupId>org.apache.flink</groupId>
>>> >>       <artifactId>flink-streaming-core</artifactId>
>>> >>       <version>${flink.version}</version>
>>> >>       <classifier>test</classifier>
>>> >>       <scope>test</scope>
>>> >>     </dependency>
>>> >
>>> >
>>>
>>
>>

Reply via email to