Please see the above gist: my test makes no assertions until after the
env.execute() call. Adding setParallelism(1) to my sink appears to
stabilize my test. Indeed, very helpful. Thanks a lot!

-n

On Wed, Nov 18, 2015 at 9:15 AM, Stephan Ewen <se...@apache.org> wrote:

> Okay, I think I misunderstood your problem.
>
> Usually you can simply execute tests one after another by waiting until
> "env.execute()" returns. The streaming jobs terminate by themselves once
> the sources reach end of stream (finite streams are supported that way) but
> make sure all records flow through the entire stream (no barrier needed).
> So if you use a source that reaches an end, no extra work is needed.
>
> If you have a proper infinite source (like Kafka), things are a bit more
> tricky, since you have a proper infinite streaming program. What we do in
> our Kafka Tests is throw a "SuccessException" in the sink once we saw all
> data we expected. You can get the cause exceptions in a try/catch around
> env.execute() to check if the program "failed" with a SuccessException, or
> whether it failed proper.
>
> A "sink in DOP 1" (sorry for the unclear terminology) is a sink with
> parallelism 1, so all data is collected by the same function instance.
>
> Any of this helpful?
>
> Stephan
>
>
> On Wed, Nov 18, 2015 at 5:13 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:
>
>> Sorry Stephan but I don't follow how global order applies in my case. I'm
>> merely checking the size of the sink results. I assume all tuples from a
>> given test invitation have sunk before the next test begins, which is
>> clearly not the case. Is there a way I can place a barrier in my tests to
>> ensure one streaming DAG runs at a time, and that all buffers have been
>> flushed to the sink before the next test begins?
>>
>> What is "sink in DOP 1"?
>>
>> Thanks,
>> Nick
>>
>>
>> On Wednesday, November 18, 2015, Stephan Ewen <se...@apache.org> wrote:
>>
>>> There is no global order in parallel streams, it is something that
>>> applications need to work with. We are thinking about adding operations to
>>> introduce event-time order (at the cost of some delay), but that is only
>>> plans at this point.
>>>
>>>
>>> What I do in my tests is run the test streams in parallel, but the Sink
>>> in DOP 1. The sink gathers the elements in a list, and the close() function
>>> validates the result.
>>>
>>> Validating the results may involve sorting the list where elements where
>>> gathered (make the order deterministic) or use a hash set if it is only
>>> about distinct count.
>>>
>>> Hope that helps.
>>>
>>> On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk <ndimi...@gmail.com>
>>> wrote:
>>>
>>>> On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <ndimi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Stephan, I'll check that out in the morning. Generally
>>>>> speaking, it would be great to have some single-jvm example tests for 
>>>>> those
>>>>> of us getting started. Following the example of WindowingIntegrationTest 
>>>>> is
>>>>> mostly working, though reusing my single sink instance with its static
>>>>> collection results in non-deterministic results; there appears to be a 
>>>>> race
>>>>> between instances clearing the collection in their open method and the
>>>>> runtime returning the collection to my test harness.
>>>>
>>>>
>>>> This inconsistent test result is pretty frustrating. I've created a
>>>> sample project with an IT that demonstrates the issue. Run `mvn test`
>>>> multiple times and see that sometimes it passes and sometimes it fails.
>>>> Maybe someone has some thoughts?
>>>>
>>>> https://gist.github.com/ndimiduk/5f3b4757eb772feed6e6
>>>>
>>>> Thanks,
>>>> Nick
>>>>
>>>> I'd also appreciate some guidance on stream composition. It's nice to
>>>>> use the fluent API when exploring data in a shell, but it seems to me like
>>>>> that API is cumbersome when composing data pipelines of reusable partials.
>>>>> Or maybe I'm doing it all wrong... Hence the request for more examples :)
>>>>>
>>>>> While I'm asking, how might you model this: I have a set of predicates
>>>>> I'd like to flatMap over a stream. An input item should be compared vs
>>>>> every predicate (basically, I want a Clojure juxt of predicates over each
>>>>> stream element). Imagine those predicates expressed as where clauses via
>>>>> the Table API. Say I have hundreds of thousands of these predicates to run
>>>>> over every stream event. Is the java client API rich enough to express 
>>>>> such
>>>>> a flow, or should I examine something lower than DataStream?
>>>>>
>>>>> Thanks a lot, and sorry for all the newb questions.
>>>>> -n
>>>>>
>>>>>
>>>>> On Thursday, November 5, 2015, Stephan Ewen <se...@apache.org> wrote:
>>>>>
>>>>>> Hey!
>>>>>>
>>>>>> There is also a collect() sink in the "flink-streaming-contrib"
>>>>>> project, see here:
>>>>>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
>>>>>>
>>>>>> It should work well locally for testing. In that case you can write a
>>>>>> program as usual an use "DataStreamUtils.collect(stream)", so you need to
>>>>>> stop reading it once you know the stream is exhausted...
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <ndimi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Robert,
>>>>>>>
>>>>>>> It seems "type" was what I needed. This it also looks like the test
>>>>>>> jar has an undeclared dependency. In the end, the following allowed
>>>>>>> me
>>>>>>> to use TestStreamEnvironment for my integration test. Thanks a lot!
>>>>>>>
>>>>>>> -n
>>>>>>>
>>>>>>>     <dependency>
>>>>>>>       <groupId>org.apache.flink</groupId>
>>>>>>>       <artifactId>flink-streaming-core</artifactId>
>>>>>>>       <version>${flink.version}</version>
>>>>>>>       <type>test-jar</type>
>>>>>>>       <scope>test</scope>
>>>>>>>     </dependency>
>>>>>>>     <dependency>
>>>>>>>       <groupId>org.apache.flink</groupId>
>>>>>>>       <artifactId>flink-test-utils</artifactId>
>>>>>>>       <version>${flink.version}</version>
>>>>>>>       <scope>test</scope>
>>>>>>>     </dependency>
>>>>>>>
>>>>>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <rmetz...@apache.org>
>>>>>>> wrote:
>>>>>>> > Hi Nick,
>>>>>>> >
>>>>>>> > we are usually publishing the test  artifacts. Can you try and
>>>>>>> replace the
>>>>>>> > <classifier> tag by <type>test-jar<type>:
>>>>>>> >
>>>>>>> > <dependency>
>>>>>>> >    <groupId>org.apache.flink</groupId>
>>>>>>> >    <artifactId>flink-streaming-core</artifactId>
>>>>>>> >    <version>${flink.version}</version>
>>>>>>> >    <type>test-jar</type>
>>>>>>> >    <scope>test</scope>
>>>>>>> > </dependency>
>>>>>>> >
>>>>>>> >
>>>>>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <ndimi...@gmail.com>
>>>>>>> wrote:
>>>>>>> >>
>>>>>>> >> Hello,
>>>>>>> >>
>>>>>>> >> I'm attempting integration tests for my streaming flows. I'd like
>>>>>>> to
>>>>>>> >> produce an input stream of java objects and sink the results into
>>>>>>> a
>>>>>>> >> collection for verification via JUnit asserts.
>>>>>>> >> StreamExecutionEnvironment provides methods for the former,
>>>>>>> however,
>>>>>>> >> how to achieve the latter is not evident based on my internet
>>>>>>> >> searching. I think I've found a solution in the
>>>>>>> TestStreamEnvironment
>>>>>>> >> class, ie, as used by WindowingIntegrationTest. However, this
>>>>>>> class
>>>>>>> >> appears to be packaged in the flink-streaming-core test artifact,
>>>>>>> >> which is not published to maven.
>>>>>>> >>
>>>>>>> >> For reference, this is the maven dependency stanza I'm using.
>>>>>>> Please
>>>>>>> >> let me know if I've got it wrong.
>>>>>>> >>
>>>>>>> >> Thanks,
>>>>>>> >> Nick
>>>>>>> >>
>>>>>>> >>     <dependency>
>>>>>>> >>       <groupId>org.apache.flink</groupId>
>>>>>>> >>       <artifactId>flink-streaming-core</artifactId>
>>>>>>> >>       <version>${flink.version}</version>
>>>>>>> >>       <classifier>test</classifier>
>>>>>>> >>       <scope>test</scope>
>>>>>>> >>     </dependency>
>>>>>>> >
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>

Reply via email to