There is no global order in parallel streams, it is something that
applications need to work with. We are thinking about adding operations to
introduce event-time order (at the cost of some delay), but that is only
plans at this point.


What I do in my tests is run the test streams in parallel, but the Sink in
DOP 1. The sink gathers the elements in a list, and the close() function
validates the result.

Validating the results may involve sorting the list where elements where
gathered (make the order deterministic) or use a hash set if it is only
about distinct count.

Hope that helps.

On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:

> On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:
>
>> Thanks Stephan, I'll check that out in the morning. Generally speaking,
>> it would be great to have some single-jvm example tests for those of us
>> getting started. Following the example of WindowingIntegrationTest is
>> mostly working, though reusing my single sink instance with its static
>> collection results in non-deterministic results; there appears to be a race
>> between instances clearing the collection in their open method and the
>> runtime returning the collection to my test harness.
>
>
> This inconsistent test result is pretty frustrating. I've created a sample
> project with an IT that demonstrates the issue. Run `mvn test` multiple
> times and see that sometimes it passes and sometimes it fails. Maybe
> someone has some thoughts?
>
> https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6
>
> Thanks,
> Nick
>
> I'd also appreciate some guidance on stream composition. It's nice to use
>> the fluent API when exploring data in a shell, but it seems to me like that
>> API is cumbersome when composing data pipelines of reusable partials. Or
>> maybe I'm doing it all wrong... Hence the request for more examples :)
>>
>> While I'm asking, how might you model this: I have a set of predicates
>> I'd like to flatMap over a stream. An input item should be compared vs
>> every predicate (basically, I want a Clojure juxt of predicates over each
>> stream element). Imagine those predicates expressed as where clauses via
>> the Table API. Say I have hundreds of thousands of these predicates to run
>> over every stream event. Is the java client API rich enough to express such
>> a flow, or should I examine something lower than DataStream?
>>
>> Thanks a lot, and sorry for all the newb questions.
>> -n
>>
>>
>> On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hey!
>>>
>>> There is also a collect() sink in the "flink-streaming-contrib" project,
>>> see here:
>>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
>>>
>>> It should work well locally for testing. In that case you can write a
>>> program as usual an use "DataStreamUtils.collect(stream)", so you need to
>>> stop reading it once you know the stream is exhausted...
>>>
>>> Stephan
>>>
>>>
>>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com>
>>> wrote:
>>>
>>>> Hi Robert,
>>>>
>>>> It seems "type" was what I needed. This it also looks like the test
>>>> jar has an undeclared dependency. In the end, the following allowed me
>>>> to use TestStreamEnvironment for my integration test. Thanks a lot!
>>>>
>>>> -n
>>>>
>>>>     <dependency>
>>>>       <groupId>org.apache.flink</groupId>
>>>>       <artifactId>flink-streaming-core</artifactId>
>>>>       <version>${flink.version}</version>
>>>>       <type>test-jar</type>
>>>>       <scope>test</scope>
>>>>     </dependency>
>>>>     <dependency>
>>>>       <groupId>org.apache.flink</groupId>
>>>>       <artifactId>flink-test-utils</artifactId>
>>>>       <version>${flink.version}</version>
>>>>       <scope>test</scope>
>>>>     </dependency>
>>>>
>>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org>
>>>> wrote:
>>>> > Hi Nick,
>>>> >
>>>> > we are usually publishing the test  artifacts. Can you try and
>>>> replace the
>>>> > <classifier> tag by <type>test-jar<type>:
>>>> >
>>>> > <dependency>
>>>> >    <groupId>org.apache.flink</groupId>
>>>> >    <artifactId>flink-streaming-core</artifactId>
>>>> >    <version>${flink.version}</version>
>>>> >    <type>test-jar</type>
>>>> >    <scope>test</scope>
>>>> > </dependency>
>>>> >
>>>> >
>>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >> Hello,
>>>> >>
>>>> >> I'm attempting integration tests for my streaming flows. I'd like to
>>>> >> produce an input stream of java objects and sink the results into a
>>>> >> collection for verification via JUnit asserts.
>>>> >> StreamExecutionEnvironment provides methods for the former, however,
>>>> >> how to achieve the latter is not evident based on my internet
>>>> >> searching. I think I've found a solution in the TestStreamEnvironment
>>>> >> class, ie, as used by WindowingIntegrationTest. However, this class
>>>> >> appears to be packaged in the flink-streaming-core test artifact,
>>>> >> which is not published to maven.
>>>> >>
>>>> >> For reference, this is the maven dependency stanza I'm using. Please
>>>> >> let me know if I've got it wrong.
>>>> >>
>>>> >> Thanks,
>>>> >> Nick
>>>> >>
>>>> >>     <dependency>
>>>> >>       <groupId>org.apache.flink</groupId>
>>>> >>       <artifactId>flink-streaming-core</artifactId>
>>>> >>       <version>${flink.version}</version>
>>>> >>       <classifier>test</classifier>
>>>> >>       <scope>test</scope>
>>>> >>     </dependency>
>>>> >
>>>> >
>>>>
>>>
>>>
>

Reply via email to