[ 
https://issues.apache.org/jira/browse/FLINK-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377601#comment-14377601
 ] 

ASF GitHub Bot commented on FLINK-1595:
---------------------------------------

Github user szape commented on a diff in the pull request:

    https://github.com/apache/flink/pull/520#discussion_r27015100
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
 ---
    @@ -0,0 +1,903 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.complex;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.functions.RichFilterFunction;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import org.apache.flink.api.java.tuple.Tuple5;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.collector.selector.OutputSelector;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.IterativeDataStream;
    +import org.apache.flink.streaming.api.datastream.SplitDataStream;
    +import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.function.WindowMapFunction;
    +import org.apache.flink.streaming.api.function.co.CoMapFunction;
    +import org.apache.flink.streaming.api.function.sink.SinkFunction;
    +import org.apache.flink.streaming.api.function.source.SourceFunction;
    +import 
org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
    +import org.apache.flink.streaming.api.windowing.helper.Count;
    +import org.apache.flink.streaming.api.windowing.helper.Delta;
    +import org.apache.flink.streaming.api.windowing.helper.Time;
    +import org.apache.flink.streaming.api.windowing.helper.Timestamp;
    +import org.apache.flink.streaming.util.RectangleClass;
    +import org.apache.flink.streaming.util.TestStreamEnvironment;
    +import org.apache.flink.util.Collector;
    +import org.junit.Test;
    +
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +public class ComplexIntegrationTest implements Serializable {
    +   private static final long serialVersionUID = 1L;
    +   private static final long MEMORYSIZE = 32;
    +
    +   private static Map<String, List<String>> results = new HashMap<String, 
List<String>>();
    +
    +   @SuppressWarnings("unchecked")
    +   public static List<Tuple5<Integer, String, Character, Double, Boolean>> 
input = Arrays.asList(
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(1, "apple", 'j', 0.1, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(1, "peach", 'b', 0.8, true),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(1, "orange", 'c', 0.7, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(2, "apple", 'd', 0.5, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(2, "apple", 'e', 0.6, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(3, "peach", 'a', 0.2, true),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(6, "peanut", 'b', 0.1, true),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(7, "banana", 'c', 0.4, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(8, "peanut", 'd', 0.2, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(10, "cherry", 'e', 0.1, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(10, "plum", 'a', 0.5, true),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(11, "strawberry", 'b', 0.3, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(11, "orange", 'c', 0.3, true),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(12, "lemon", 'd', 0.9, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(13, "apple", 'e', 0.7, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(15, "lemon", 'a', 0.2, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(16, "apple", 'b', 0.8, true),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(16, "banana", 'c', 0.8, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(16, "orange", 'd', 0.1, false),
    +                   new Tuple5<Integer, String, Character, Double, 
Boolean>(17, "cherry", 'e', 1.0, false));
    +
    +// @Test
    +// public void complexIntegrationTest() throws Exception {
    +//
    +//         MyTimestamp timestamp = new MyTimestamp();
    +//         List<String> resultList1 = new ArrayList<String>();
    +//         //results.put("resultList")
    +//
    +//         StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
    +//         env.setBufferTimeout(10);
    +//
    +//         DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = 
env.addSource(new TupleSource());
    +//         DataStream<OuterPojo> sourceStream2 = env.addSource(new 
PojoSource());
    +//         DataStream<Long> sourceStream3 = env.generateSequence(1, 50);
    +//         DataStream<Tuple5<Integer, String, Character, Double, Boolean>> 
sourceStream4 = env.fromCollection(input);
    +//         DataStream<Long> sourceStream5 = env.fromElements(10L, 20L, 
30L, 40L, 50L, 60L, 70L, -10L, -20L, -30L);
    +//
    +//         //noinspection unchecked
    +//         env.fromCollection(input)
    +//                         .groupBy(1).sum(3)
    +//                         .groupBy(2).window(Time.of(10, new 
MyTimestamp(), 0)).every(Time.of(5, new MyTimestamp(), 0)).max(3)
    +// .flatten()
    +//                         .merge(sourceStream4.filter(new 
MyFilterFunction()).distribute())
    +//                         .map(new MapFunction<Tuple5<Integer, String, 
Character, Double, Boolean>, Tuple4<Integer, String,
    +//                                         Double, Boolean>>() {
    +//
    +//                                 @Override
    +//                                 public Tuple4<Integer, String, Double, 
Boolean> map(Tuple5<Integer, String, Character, Double,
    +//                                                 Boolean> value) throws 
Exception {
    +//                                         return new Tuple4<Integer, 
String, Double, Boolean>(value.f0, value.f1 + "-" + value.f2, value
    +//                                                         .f3,
    +//                                                         value.f4);
    +//                                 }
    +//                         }).broadcast().flatMap(new 
MyFlatMapFunction()).connect(sourceStream2).map(new
    +//                         MyCoMapFunction()).addSink(new 
ToListSink(resultList1));
    +//
    +//         IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = 
sourceStream1.sum(0).filter(new FilterFunction
    +//                         <Tuple2<Long, Tuple2<String, Long>>>() {
    +//
    +//                 @Override
    +//                 public boolean filter(Tuple2<Long, Tuple2<String, 
Long>> value) throws Exception {
    +//                         return value.f0 < 20;
    +//                 }
    +//         }).iterate(1000);
    +//
    +//         SplitDataStream<Tuple2<Long, Tuple2<String, Long>>> step = 
it.map(new IncrementMap()).split(new
    +// MyOutputSelector());
    +//         it.closeWith(step.select("iterate"));
    +//
    +//         step.select("firstOutput").print();
    +//         step.select("secondOutput").print();
    +//
    +//         IterativeDataStream<Tuple3<List<Tuple2<Double, String>>, 
Integer, Double>> it2 = sourceStream4.project(3, 1)
    +// .types(Double.class, String
    +//                         .class).forward().map(new 
MapFunction<Tuple2<Double, String>, Tuple3<List<Tuple2<Double, String>>,
    +// Integer, Double>>(){
    +//
    +//                 @Override
    +//                 public Tuple3<List<Tuple2<Double, String>>, Integer, 
Double> map(Tuple2<Double, String> value) throws
    +//                                 Exception {
    +//                         List<Tuple2<Double, String>> list = new 
ArrayList<Tuple2<Double, String>>();
    +//                         list.add(value);
    +//                         return new Tuple3<List<Tuple2<Double, String>>, 
Integer, Double>(list, 1, value.f0);
    +//                 }
    +//         }).iterate(5000);
    +//
    +//         SplitDataStream<Tuple3<List<Tuple2<Double, String>>, Integer, 
Double>> boxing =
    +//                         it2.window(Count.of(2))
    +//                         .mapWindow(new 
WindowMapFunction<Tuple3<List<Tuple2<Double, String>>, Integer, Double>,
    +//                                         Tuple3<List<Tuple2<Double, 
String>>, Integer, Double>>() {
    +//
    +//                                 @Override
    +//                                 public void 
mapWindow(Iterable<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> 
values,
    +//                                                 
Collector<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> out) throws 
Exception {
    +//                                         List<Tuple2<Double, String>> 
list = new ArrayList<Tuple2<Double, String>>();
    +//                                         int count = 0;
    +//                                         double quantity = 0.0;
    +//
    +//                                         for (Tuple3<List<Tuple2<Double, 
String>>, Integer, Double> value : values) {
    +//                                                 list.addAll(value.f0);
    +//                                                 count += value.f1;
    +//                                                 quantity += value.f2;
    +//                                         }
    +//                                         out.collect(new 
Tuple3<List<Tuple2<Double, String>>, Integer, Double>(list, count,
    +//                                                         quantity));
    +//
    +//                                 }
    +//                         })
    +//.flatten().split(new OutputSelector<Tuple3<List<Tuple2<Double, String>>, 
Integer, Double>>(){
    +//
    +//                                 @Override
    +//                                 public Iterable<String> 
select(Tuple3<List<Tuple2<Double, String>>, Integer, Double> value) {
    +//                                         List<String> output = new 
ArrayList<String>();
    +//                                         if (value.f2 < 2) {
    +//                                                 output.add("iterate");
    +//                                         } else {
    +//                                                 output.add("output");
    +//                                         }
    +//                                         return output;
    +//                                 }
    +//                         });
    +//
    +//         it2.closeWith(boxing.select("iterate"));
    +//
    +//         boxing.select("output").print();
    +//         boxing.select("iterate").print();
    +//
    +//         //sourceStream1.project(0).types(Long.class).print();
    +//         //sourceStream2.groupBy("f0").sum("f1").print();
    +//
    +//         env.execute();
    +//
    +//         System.out.println(resultList1);
    +// }
    +
    +   @Test
    +   public void complexIntegrationTest2() throws Exception {
    +
    +           StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
    +           env.setBufferTimeout(10);
    +
    +           DataStream<Tuple5<Integer, String, Character, Double, Boolean>> 
sourceStream1 = env.fromCollection(input);
    +           DataStream<OuterPojo> sourceStream2 = env.addSource(new 
PojoSource());
    +
    +           //noinspection unchecked
    +           sourceStream1.groupBy(1).sum(3)
    +                           .groupBy(2).window(Time.of(10, new 
MyTimestamp(), 0)).every(Time.of(4, new MyTimestamp(), 0)).max(3)
    +                           .flatten()
    +                           .merge(sourceStream1.filter(new 
MyFilterFunction()).distribute())
    +                           .map(new MapFunction<Tuple5<Integer, String, 
Character, Double, Boolean>, Tuple4<Integer, String,
    +                                           Double, Boolean>>() {
    +
    +                                   @Override
    +                                   public Tuple4<Integer, String, Double, 
Boolean> map(Tuple5<Integer, String, Character, Double,
    +                                                   Boolean> value) throws 
Exception {
    +                                           return new Tuple4<Integer, 
String, Double, Boolean>(value.f0, value.f1 + "-" + value.f2, value
    +                                                           .f3,
    +                                                           value.f4);
    +                                   }
    +                           }).broadcast().flatMap(new 
MyFlatMapFunction()).connect(sourceStream2).map(new
    +                           MyCoMapFunction()).addSink(new 
ToListSink<String>("test2"));
    +
    +           env.execute();
    +
    +           String result = "water_melon-b, water_melon-b, water_melon-b, 
water_melon-b, water_melon-b, water_melon-b, " +
    +                           "water_melon-b, water_melon-b, water_melon-b, 
water_melon-b, water_melon-b, water_melon-b, " +
    +                           "water_melon-b, " +
    +                           "water_melon-b, water_melon-b, water_melon-b, 
water_melon-b, water_melon-b, water_melon-b, " +
    +                           "water_melon-b, " +
    +                           "peach-b, peach-b, peach-a, peach-a, peach-a, 
peach-a, peanut-b, peanut-b, plum-a, plum-a, orange-c, " +
    +                           "orange-c, " +
    +                           "orange-c, orange-c, apple-b, apple-b";
    +
    +           ArrayList<String> test2Results = new 
ArrayList<String>(Arrays.asList(result.split(", ")));
    +
    +           Collections.sort(test2Results);
    +           Collections.sort(results.get("test2"));
    +           assertEquals(test2Results, results.get("test2"));
    +           System.out.println(results.get("test2"));
    +   }
    +
    +   @Test
    +   public void complexIntegrationTest3() throws Exception {
    +           ArrayList<String> expected1 = new ArrayList<String>();
    +           for (int i = 0; i < 9; i++) {
    +                   expected1.add("(10,(a,1))");
    +           }
    +           ArrayList<String> expected2 = new ArrayList<String>();
    +           for (int i = 0; i < 19; i++) {
    +                   expected2.add("(20,(a,1))");
    +           }
    +
    +           StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
    +           env.setBufferTimeout(10);
    +
    +           DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream = 
env.addSource(new TupleSource());
    +           IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = 
sourceStream.sum(0).filter(new FilterFunction
    +                           <Tuple2<Long, Tuple2<String, Long>>>() {
    +
    +                   @Override
    +                   public boolean filter(Tuple2<Long, Tuple2<String, 
Long>> value) throws Exception {
    +                           return value.f0 < 20;
    +                   }
    +           }).iterate(1000);
    +
    +           SplitDataStream<Tuple2<Long, Tuple2<String, Long>>> step = 
it.map(new IncrementMap()).split(new
    +                           MyOutputSelector());
    +           it.closeWith(step.select("iterate"));
    +
    +           step.select("firstOutput").addSink(new ToListSink<Tuple2<Long, 
Tuple2<String, Long>>>("test31"));
    +           step.select("secondOutput").addSink(new ToListSink<Tuple2<Long, 
Tuple2<String, Long>>>("test32"));
    +
    +           env.execute();
    +
    +           assertEquals(expected1, results.get("test31"));
    +           assertEquals(expected2, results.get("test32"));
    +
    +   }
    +
    +// non functioning
    +// @Test
    +// public void complexIntegrationTest4() throws Exception {
    +//         StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
    +//         env.setBufferTimeout(10);
    +//
    +//         DataStream<Tuple5<Integer, String, Character, Double, Boolean>> 
sourceStream = env.fromCollection(input);
    +//
    +//         IterativeDataStream<Tuple3<List<Tuple2<Double, String>>, 
Integer, Double>> it2 = sourceStream.project(3, 1)
    +// .types(Double.class, String
    +//                         .class).forward().map(new 
MapFunction<Tuple2<Double, String>, Tuple3<List<Tuple2<Double, String>>,
    +// Integer, Double>>(){
    +//
    +//                 @Override
    +//                 public Tuple3<List<Tuple2<Double, String>>, Integer, 
Double> map(Tuple2<Double, String> value) throws
    +//                                 Exception {
    +//                         List<Tuple2<Double, String>> list = new 
ArrayList<Tuple2<Double, String>>();
    +//                         list.add(value);
    +//                         return new Tuple3<List<Tuple2<Double, String>>, 
Integer, Double>(list, 1, value.f0);
    +//                 }
    +//         }).iterate(5000);
    +//
    +//         SplitDataStream<Tuple3<List<Tuple2<Double, String>>, Integer, 
Double>> boxing =
    +//                         it2.window(Count.of(2))
    +//                                         .mapWindow(new 
WindowMapFunction<Tuple3<List<Tuple2<Double, String>>, Integer, Double>,
    +//                                                         
Tuple3<List<Tuple2<Double, String>>, Integer, Double>>() {
    +//
    +//                                                 @Override
    +//                                                 public void 
mapWindow(Iterable<Tuple3<List<Tuple2<Double, String>>, Integer, Double>>
    +// values,
    +//                                                                 
Collector<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> out) throws
    +// Exception {
    +//                                                         
List<Tuple2<Double, String>> list = new ArrayList<Tuple2<Double, String>>();
    +//                                                         int count = 0;
    +//                                                         double quantity 
= 0.0;
    +//
    +//                                                         for 
(Tuple3<List<Tuple2<Double, String>>, Integer, Double> value : values) {
    +//                                                                 
list.addAll(value.f0);
    +//                                                                 count 
+= value.f1;
    +//                                                                 
quantity += value.f2;
    +//                                                         }
    +//                                                         out.collect(new 
Tuple3<List<Tuple2<Double, String>>, Integer, Double>(list, count,
    +//                                                                         
quantity));
    +//
    +//                                                 }
    +//                                         })
    +//                                         .flatten().split(new 
OutputSelector<Tuple3<List<Tuple2<Double, String>>, Integer, Double>>(){
    +//
    +//                                 @Override
    +//                                 public Iterable<String> 
select(Tuple3<List<Tuple2<Double, String>>, Integer, Double> value) {
    +//                                         List<String> output = new 
ArrayList<String>();
    +//                                         if (value.f2 < 2) {
    +//                                                 output.add("iterate");
    +//                                         } else {
    +//                                                 output.add("output");
    +//                                         }
    +//                                         return output;
    +//                                 }
    +//                         });
    +//
    +//         it2.closeWith(boxing.select("iterate"));
    +//
    +//         boxing.select("output").print();
    +//
    +//         env.execute();
    +// }
    +
    +   @Test
    +   public void complexIntegrationTest5() throws Exception {
    +           ArrayList<String> expected = new ArrayList<String>();
    +           for (int i = 0; i < 20; i++) {
    +                   expected.add("((" + i / 2 + ",water_melon-b,2,pojo)," + 
(i % 2 + 1) + ")");
    +                   expected.add("((1,a,1,tuple)," + (i + 1) + ")");
    +           }
    +
    +           StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
    +           env.setBufferTimeout(10);
    +
    +           DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = 
env.addSource(new TupleSource());
    +           DataStream<OuterPojo> sourceStream2 = env.addSource(new 
PojoSource());
    +
    +           
sourceStream1.shuffle().connect(sourceStream2.broadcast()).map(new 
StringifyCoMapFunction()).map(new
    +                                                                           
                                                                                
                                                                 
MapFunction<String, Tuple2<String, Short>>() {
    +
    +                   @Override
    +                   public Tuple2<String, Short> map(String value) throws 
Exception {
    +                           return new Tuple2<String, Short>(value, (short) 
1);
    +                   }
    +
    +           }).groupBy(0).sum(1).addSink(new ToListSink<Tuple2<String, 
Short>>("test5"));
    +
    +           env.execute();
    +
    +           Collections.sort(expected);
    +           Collections.sort(results.get("test5"));
    +
    +           assertEquals(expected, results.get("test5"));
    +   }
    +
    +   @Test
    +   public void complexIntegrationTest6() throws Exception {
    +           //heavy traffic with maps and flatmaps
    +
    +           ArrayList<String> expected1 = new ArrayList<String>();
    +           expected1.add("541");
    +           expected1.add("1223");
    +           expected1.add("1987");
    +           expected1.add("2741");
    +           expected1.add("10939");
    +           expected1.add("3571");
    +           expected1.add("4409");
    +           expected1.add("11927");
    +           expected1.add("5279");
    +           expected1.add("6133");
    +           expected1.add("12823");
    +           expected1.add("6997");
    +           expected1.add("7919");
    +           expected1.add("13763");
    +           expected1.add("8831");
    +           expected1.add("9733");
    +           expected1.add("14759");
    +           expected1.add("9973");
    +           expected1.add("15671");
    +           expected1.add("16673");
    +           expected1.add("17659");
    +           expected1.add("18617");
    +           expected1.add("19697");
    +           expected1.add("19997");
    +
    +           ArrayList<String> expected2 = new ArrayList<String>();
    +           for (int i = 2; i < 100; i++) {
    +                   expected2.add("(" + i + "," + 20000 / i + ")");
    +           }
    +           for (int i = 19901; i <= 20000; i++) {
    +                   expected2.add("(" + i + "," + 20000 / i + ")");
    +           }
    +
    +           StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
MEMORYSIZE);
    +           env.setBufferTimeout(1);
    +
    +           DataStream<Long> sourceStream1 = env.generateSequence(1, 10000);
    +           DataStream<Long> sourceStream2 = env.generateSequence(10001, 
20000);
    +
    +           //noinspection unchecked
    +           sourceStream1.filter(new PrimeFilterFunction())
    +                           .window(Count.of(100))
    +                           .max(0).flatten()
    +                           .merge(sourceStream2.filter(new 
PrimeFilterFunction())
    +                                           .window(Count.of(100))
    +                                           .max(0).flatten())
    +                           .addSink(new ToListSink<Long>("test61"));
    +           sourceStream1.flatMap(new DivisorsFlatMapFunction())
    +                           .merge(sourceStream2.flatMap(new 
DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long,
    +                           Integer>>() {
    +
    +                   @Override
    +                   public Tuple2<Long, Integer> map(Long value) throws 
Exception {
    +                           return new Tuple2<Long, Integer>(value, 1);
    +                   }
    +           })
    +                           .groupBy(0)
    +                           .sum(1)
    +                           .groupBy(0)
    +                           .window(Count.of(10000)).max(1).flatten()
    +                           .filter(new FilterFunction<Tuple2<Long, 
Integer>>() {
    +
    +                                   @Override
    +                                   public boolean filter(Tuple2<Long, 
Integer> value) throws Exception {
    +                                           return value.f0 < 100 || 
value.f0 > 19900;
    +                                   }
    +                           })
    +                           .addSink(new ToListSink<Tuple2<Long, 
Integer>>("test62"));
    +
    +           env.execute();
    +
    +           Collections.sort(expected1);
    +           Collections.sort(expected2);
    +           Collections.sort(results.get("test61"));
    +           Collections.sort(results.get("test62"));
    +
    +           assertEquals(expected1, results.get("test61"));
    +           assertEquals(expected2, results.get("test62"));
    +
    +   }
    +
    +   @Test
    +   public void complexIntegrationTest7() throws Exception {
    +
    +           ArrayList<String> expected = new ArrayList<String>();
    +           expected.add("((100,100),0)");
    +           expected.add("((120,122),5)");
    +           expected.add("((121,125),6)");
    +           expected.add("((138,144),9)");
    +           expected.add("((139,147),10)");
    +           expected.add("((156,166),13)");
    +           expected.add("((157,169),14)");
    +           expected.add("((174,188),17)");
    +           expected.add("((175,191),18)");
    +           expected.add("((192,210),21)");
    +           expected.add("((193,213),22)");
    +           expected.add("((210,232),25)");
    +           expected.add("((211,235),26)");
    +           expected.add("((228,254),29)");
    +           expected.add("((229,257),30)");
    +           expected.add("((246,276),33)");
    +           expected.add("((247,279),34)");
    +           expected.add("((264,298),37)");
    +           expected.add("((265,301),38)");
    +           expected.add("((282,320),41)");
    +           expected.add("((283,323),42)");
    +           expected.add("((300,342),45)");
    +           expected.add("((301,345),46)");
    +           expected.add("((318,364),49)");
    +           expected.add("((319,367),50)");
    +           expected.add("((336,386),53)");
    +           expected.add("((337,389),54)");
    +           expected.add("((354,408),57)");
    +           expected.add("((355,411),58)");
    +           expected.add("((372,430),61)");
    +           expected.add("((373,433),62)");
    +           expected.add("((390,452),65)");
    +           expected.add("((391,455),66)");
    +           expected.add("((408,474),69)");
    +           expected.add("((409,477),70)");
    +           expected.add("((426,496),73)");
    +           expected.add("((427,499),74)");
    +           expected.add("((444,518),77)");
    +           expected.add("((445,521),78)");
    +           expected.add("((462,540),81)");
    +           expected.add("((463,543),82)");
    +           expected.add("((480,562),85)");
    +           expected.add("((481,565),86)");
    +           expected.add("((498,584),89)");
    +           expected.add("((499,587),90)");
    +           expected.add("((516,606),93)");
    +           expected.add("((517,609),94)");
    +           expected.add("((534,628),97)");
    +           expected.add("((535,631),98)");
    +
    +           StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
32);
    +           env.setBufferTimeout(100);
    +
    +           env.addSource(new RectangleSource()).global()
    +                           .map(new RectangleMapFunction())
    +                           .window(Delta.of(0.0, new 
DeltaFunction<Tuple2<RectangleClass, Integer>>() {
    +
    +                                   @Override
    +                                   public double 
getDelta(Tuple2<RectangleClass, Integer> oldDataPoint, Tuple2<RectangleClass,
    +                                                   Integer> newDataPoint) {
    +                                           return (newDataPoint.f0.b - 
newDataPoint.f0.a) - (oldDataPoint.f0.b - oldDataPoint.f0.a);
    +                                   }
    +                           }, new Tuple2<RectangleClass, Integer>(new 
RectangleClass(100, 100), 0)))
    +                           .mapWindow(new MyWindowMapFunction())
    +                           .flatten()
    +                           .addSink(new ToListSink<Tuple2<RectangleClass, 
Integer>>("test7"));
    +
    +           env.execute();
    +
    +           Collections.sort(expected);
    +           Collections.sort(results.get("test7"));
    +           assertEquals(expected, results.get("test7"));
    +   }
    +
    +   @Test
    +   public void complexIntegrationTest8() throws Exception {
    +
    +           ArrayList<String> expected = new ArrayList<String>();
    +
    +           StreamExecutionEnvironment env = new TestStreamEnvironment(1, 
32);
    +           env.setBufferTimeout(1);
    +
    +           SplitDataStream<Long> splittedStream1 = env.generateSequence(1, 
10)
    +                           .map(new MapFunction<Long, Long>() {
    +
    +                                   @Override
    +                                   public Long map(Long value) throws 
Exception {
    +                                           return value;
    +                                   }
    +                           })
    +                           .filter(new FilterFunction<Long>() {
    +
    +                                   @Override
    +                                   public boolean filter(Long value) 
throws Exception {
    +                                           return true;
    +                                   }
    +                           })
    +                           .flatMap(new SquareFlatMapFunction())
    +                           .split(new OutputSelector<Long>() {
    +
    +                                   @Override
    +                                   public Iterable<String> select(Long 
value) {
    +                                           ArrayList<String> output = new 
ArrayList<String>();
    +                                           if(value < 8) {
    +                                                   output.add("first");
    +                                           } else {
    +                                                   output.add("second");
    +                                           }
    +                                           return output;
    +                                   }
    +                           });
    +
    +           //splittedStream1.select("second").print();
    +
    +           SplitDataStream<Long> splittedStream2 = 
splittedStream1.select("first").split(new OutputSelector<Long>() {
    +
    +                   @Override
    +                   public Iterable<String> select(Long value) {
    +                           ArrayList<String> output = new 
ArrayList<String>();
    +                           if(value < 5) {
    +                                   output.add("third");
    +                           } else {
    +                                   output.add("fourth");
    +                           }
    +                           return output;
    +                   }
    +           });
    +
    +           DataStream<Long> dataStream3 = 
splittedStream1.select("second").map(new MapFunction<Long, Long>() {
    +
    +                   @Override
    +                   public Long map(Long value) throws Exception {
    +                           return value;
    +                   }
    +           });
    +
    +           dataStream3.print();
    +           splittedStream2.select("third").print();
    +           // BUG!!!
    +           dataStream3.merge(splittedStream2.select("third")).print();
    +
    +           //splittedStream2.select("fourth").print();
    +
    +           //.addSink(new ToListSink<Long>("test8"));
    +
    +           env.execute();
    +
    +           //Collections.sort(expected);
    +           //Collections.sort(results.get("test8"));
    +           //assertEquals(expected, results.get("test8"));
    +   }
    +
    +   public static class InnerPojo {
    +           public Long f0;
    --- End diff --
    
    Agreed, I will include java collections, date-types in some tests.


> Add a complex integration test for Streaming API
> ------------------------------------------------
>
>                 Key: FLINK-1595
>                 URL: https://issues.apache.org/jira/browse/FLINK-1595
>             Project: Flink
>          Issue Type: Task
>          Components: Streaming
>            Reporter: Gyula Fora
>            Assignee: Péter Szabó
>              Labels: Starter
>
> The streaming tests currently lack a sophisticated integration test that 
> would test many api features at once. 
> This should include different merging, partitioning, grouping, aggregation 
> types, as well as windowing and connected operators.
> The results should be tested for correctness.
> A test like this would help identifying bugs that are hard to detect by 
> unit-tests.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to