Sorry Stephan but I don't follow how global order applies in my case. I'm
merely checking the size of the sink results. I assume all tuples from a
given test invitation have sunk before the next test begins, which is
clearly not the case. Is there a way I can place a barrier in my tests to
ensure one streaming DAG runs at a time, and that all buffers have been
flushed to the sink before the next test begins?

What is "sink in DOP 1"?

Thanks,
Nick

On Wednesday, November 18, 2015, Stephan Ewen <se...@apache.org> wrote:

> There is no global order in parallel streams, it is something that
> applications need to work with. We are thinking about adding operations to
> introduce event-time order (at the cost of some delay), but that is only
> plans at this point.
>
>
> What I do in my tests is run the test streams in parallel, but the Sink in
> DOP 1. The sink gathers the elements in a list, and the close() function
> validates the result.
>
> Validating the results may involve sorting the list where elements where
> gathered (make the order deterministic) or use a hash set if it is only
> about distinct count.
>
> Hope that helps.
>
> On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk <ndimi...@gmail.com
> <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote:
>
>> On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <ndimi...@gmail.com
>> <javascript:_e(%7B%7D,'cvml','ndimi...@gmail.com');>> wrote:
>>
>>> Thanks Stephan, I'll check that out in the morning. Generally speaking,
>>> it would be great to have some single-jvm example tests for those of us
>>> getting started. Following the example of WindowingIntegrationTest is
>>> mostly working, though reusing my single sink instance with its static
>>> collection results in non-deterministic results; there appears to be a race
>>> between instances clearing the collection in their open method and the
>>> runtime returning the collection to my test harness.
>>
>>
>> This inconsistent test result is pretty frustrating. I've created a
>> sample project with an IT that demonstrates the issue. Run `mvn test`
>> multiple times and see that sometimes it passes and sometimes it fails.
>> Maybe someone has some thoughts?
>>
>> https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6
>>
>> Thanks,
>> Nick
>>
>> I'd also appreciate some guidance on stream composition. It's nice to use
>>> the fluent API when exploring data in a shell, but it seems to me like that
>>> API is cumbersome when composing data pipelines of reusable partials. Or
>>> maybe I'm doing it all wrong... Hence the request for more examples :)
>>>
>>> While I'm asking, how might you model this: I have a set of predicates
>>> I'd like to flatMap over a stream. An input item should be compared vs
>>> every predicate (basically, I want a Clojure juxt of predicates over each
>>> stream element). Imagine those predicates expressed as where clauses via
>>> the Table API. Say I have hundreds of thousands of these predicates to run
>>> over every stream event. Is the java client API rich enough to express such
>>> a flow, or should I examine something lower than DataStream?
>>>
>>> Thanks a lot, and sorry for all the newb questions.
>>> -n
>>>
>>>
>>> On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org
>>> <javascript:_e(%7B%7D,'cvml','se...@apache.org');>> wrote:
>>>
>>>> Hey!
>>>>
>>>> There is also a collect() sink in the "flink-streaming-contrib"
>>>> project, see here:
>>>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
>>>>
>>>> It should work well locally for testing. In that case you can write a
>>>> program as usual an use "DataStreamUtils.collect(stream)", so you need to
>>>> stop reading it once you know the stream is exhausted...
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Robert,
>>>>>
>>>>> It seems "type" was what I needed. This it also looks like the test
>>>>> jar has an undeclared dependency. In the end, the following allowed me
>>>>> to use TestStreamEnvironment for my integration test. Thanks a lot!
>>>>>
>>>>> -n
>>>>>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.flink</groupId>
>>>>>       <artifactId>flink-streaming-core</artifactId>
>>>>>       <version>${flink.version}</version>
>>>>>       <type>test-jar</type>
>>>>>       <scope>test</scope>
>>>>>     </dependency>
>>>>>     <dependency>
>>>>>       <groupId>org.apache.flink</groupId>
>>>>>       <artifactId>flink-test-utils</artifactId>
>>>>>       <version>${flink.version}</version>
>>>>>       <scope>test</scope>
>>>>>     </dependency>
>>>>>
>>>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org>
>>>>> wrote:
>>>>> > Hi Nick,
>>>>> >
>>>>> > we are usually publishing the test  artifacts. Can you try and
>>>>> replace the
>>>>> > <classifier> tag by <type>test-jar<type>:
>>>>> >
>>>>> > <dependency>
>>>>> >    <groupId>org.apache.flink</groupId>
>>>>> >    <artifactId>flink-streaming-core</artifactId>
>>>>> >    <version>${flink.version}</version>
>>>>> >    <type>test-jar</type>
>>>>> >    <scope>test</scope>
>>>>> > </dependency>
>>>>> >
>>>>> >
>>>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Hello,
>>>>> >>
>>>>> >> I'm attempting integration tests for my streaming flows. I'd like to
>>>>> >> produce an input stream of java objects and sink the results into a
>>>>> >> collection for verification via JUnit asserts.
>>>>> >> StreamExecutionEnvironment provides methods for the former, however,
>>>>> >> how to achieve the latter is not evident based on my internet
>>>>> >> searching. I think I've found a solution in the
>>>>> TestStreamEnvironment
>>>>> >> class, ie, as used by WindowingIntegrationTest. However, this class
>>>>> >> appears to be packaged in the flink-streaming-core test artifact,
>>>>> >> which is not published to maven.
>>>>> >>
>>>>> >> For reference, this is the maven dependency stanza I'm using. Please
>>>>> >> let me know if I've got it wrong.
>>>>> >>
>>>>> >> Thanks,
>>>>> >> Nick
>>>>> >>
>>>>> >>     <dependency>
>>>>> >>       <groupId>org.apache.flink</groupId>
>>>>> >>       <artifactId>flink-streaming-core</artifactId>
>>>>> >>       <version>${flink.version}</version>
>>>>> >>       <classifier>test</classifier>
>>>>> >>       <scope>test</scope>
>>>>> >>     </dependency>
>>>>> >
>>>>> >
>>>>>
>>>>
>>>>
>>
>

Reply via email to