[ https://issues.apache.org/jira/browse/FLINK-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376437#comment-14376437 ]
ASF GitHub Bot commented on FLINK-1595: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/520#discussion_r26969789 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java --- @@ -0,0 +1,903 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.complex; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.IterativeDataStream; +import org.apache.flink.streaming.api.datastream.SplitDataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.function.WindowMapFunction; +import org.apache.flink.streaming.api.function.co.CoMapFunction; +import org.apache.flink.streaming.api.function.sink.SinkFunction; +import org.apache.flink.streaming.api.function.source.SourceFunction; +import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; +import org.apache.flink.streaming.api.windowing.helper.Count; +import org.apache.flink.streaming.api.windowing.helper.Delta; +import org.apache.flink.streaming.api.windowing.helper.Time; +import org.apache.flink.streaming.api.windowing.helper.Timestamp; +import org.apache.flink.streaming.util.RectangleClass; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class ComplexIntegrationTest implements Serializable { + private static final long serialVersionUID = 1L; + private static final long MEMORYSIZE = 32; + + private static Map<String, List<String>> results = new HashMap<String, List<String>>(); + + @SuppressWarnings("unchecked") + public static List<Tuple5<Integer, String, Character, Double, Boolean>> input = Arrays.asList( + new Tuple5<Integer, String, Character, Double, Boolean>(1, "apple", 'j', 0.1, false), + new Tuple5<Integer, String, Character, Double, Boolean>(1, "peach", 'b', 0.8, true), + new Tuple5<Integer, String, Character, Double, Boolean>(1, "orange", 'c', 0.7, false), + new Tuple5<Integer, String, Character, Double, Boolean>(2, "apple", 'd', 0.5, false), + new Tuple5<Integer, String, Character, Double, Boolean>(2, "apple", 'e', 0.6, false), + new Tuple5<Integer, String, Character, Double, Boolean>(3, "peach", 'a', 0.2, true), + new Tuple5<Integer, String, Character, Double, Boolean>(6, "peanut", 'b', 0.1, true), + new Tuple5<Integer, String, Character, Double, Boolean>(7, "banana", 'c', 0.4, false), + new Tuple5<Integer, String, Character, Double, Boolean>(8, "peanut", 'd', 0.2, false), + new Tuple5<Integer, String, Character, Double, Boolean>(10, "cherry", 'e', 0.1, false), + new Tuple5<Integer, String, Character, Double, Boolean>(10, "plum", 'a', 0.5, true), + new Tuple5<Integer, String, Character, Double, Boolean>(11, "strawberry", 'b', 0.3, false), + new Tuple5<Integer, String, Character, Double, Boolean>(11, "orange", 'c', 0.3, true), + new Tuple5<Integer, String, Character, Double, Boolean>(12, "lemon", 'd', 0.9, false), + new Tuple5<Integer, String, Character, Double, Boolean>(13, "apple", 'e', 0.7, false), + new Tuple5<Integer, String, Character, Double, Boolean>(15, "lemon", 'a', 0.2, false), + new Tuple5<Integer, String, Character, Double, Boolean>(16, "apple", 'b', 0.8, true), + new Tuple5<Integer, String, Character, Double, Boolean>(16, "banana", 'c', 0.8, false), + new Tuple5<Integer, String, Character, Double, Boolean>(16, "orange", 'd', 0.1, false), + new Tuple5<Integer, String, Character, Double, Boolean>(17, "cherry", 'e', 1.0, false)); + +// @Test +// public void complexIntegrationTest() throws Exception { +// +// MyTimestamp timestamp = new MyTimestamp(); +// List<String> resultList1 = new ArrayList<String>(); +// //results.put("resultList") +// +// StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); +// env.setBufferTimeout(10); +// +// DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()); +// DataStream<OuterPojo> sourceStream2 = env.addSource(new PojoSource()); +// DataStream<Long> sourceStream3 = env.generateSequence(1, 50); +// DataStream<Tuple5<Integer, String, Character, Double, Boolean>> sourceStream4 = env.fromCollection(input); +// DataStream<Long> sourceStream5 = env.fromElements(10L, 20L, 30L, 40L, 50L, 60L, 70L, -10L, -20L, -30L); +// +// //noinspection unchecked +// env.fromCollection(input) +// .groupBy(1).sum(3) +// .groupBy(2).window(Time.of(10, new MyTimestamp(), 0)).every(Time.of(5, new MyTimestamp(), 0)).max(3) +// .flatten() +// .merge(sourceStream4.filter(new MyFilterFunction()).distribute()) +// .map(new MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, Tuple4<Integer, String, +// Double, Boolean>>() { +// +// @Override +// public Tuple4<Integer, String, Double, Boolean> map(Tuple5<Integer, String, Character, Double, +// Boolean> value) throws Exception { +// return new Tuple4<Integer, String, Double, Boolean>(value.f0, value.f1 + "-" + value.f2, value +// .f3, +// value.f4); +// } +// }).broadcast().flatMap(new MyFlatMapFunction()).connect(sourceStream2).map(new +// MyCoMapFunction()).addSink(new ToListSink(resultList1)); +// +// IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.sum(0).filter(new FilterFunction +// <Tuple2<Long, Tuple2<String, Long>>>() { +// +// @Override +// public boolean filter(Tuple2<Long, Tuple2<String, Long>> value) throws Exception { +// return value.f0 < 20; +// } +// }).iterate(1000); +// +// SplitDataStream<Tuple2<Long, Tuple2<String, Long>>> step = it.map(new IncrementMap()).split(new +// MyOutputSelector()); +// it.closeWith(step.select("iterate")); +// +// step.select("firstOutput").print(); +// step.select("secondOutput").print(); +// +// IterativeDataStream<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> it2 = sourceStream4.project(3, 1) +// .types(Double.class, String +// .class).forward().map(new MapFunction<Tuple2<Double, String>, Tuple3<List<Tuple2<Double, String>>, +// Integer, Double>>(){ +// +// @Override +// public Tuple3<List<Tuple2<Double, String>>, Integer, Double> map(Tuple2<Double, String> value) throws +// Exception { +// List<Tuple2<Double, String>> list = new ArrayList<Tuple2<Double, String>>(); +// list.add(value); +// return new Tuple3<List<Tuple2<Double, String>>, Integer, Double>(list, 1, value.f0); +// } +// }).iterate(5000); +// +// SplitDataStream<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> boxing = +// it2.window(Count.of(2)) +// .mapWindow(new WindowMapFunction<Tuple3<List<Tuple2<Double, String>>, Integer, Double>, +// Tuple3<List<Tuple2<Double, String>>, Integer, Double>>() { +// +// @Override +// public void mapWindow(Iterable<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> values, +// Collector<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> out) throws Exception { +// List<Tuple2<Double, String>> list = new ArrayList<Tuple2<Double, String>>(); +// int count = 0; +// double quantity = 0.0; +// +// for (Tuple3<List<Tuple2<Double, String>>, Integer, Double> value : values) { +// list.addAll(value.f0); +// count += value.f1; +// quantity += value.f2; +// } +// out.collect(new Tuple3<List<Tuple2<Double, String>>, Integer, Double>(list, count, +// quantity)); +// +// } +// }) +//.flatten().split(new OutputSelector<Tuple3<List<Tuple2<Double, String>>, Integer, Double>>(){ +// +// @Override +// public Iterable<String> select(Tuple3<List<Tuple2<Double, String>>, Integer, Double> value) { +// List<String> output = new ArrayList<String>(); +// if (value.f2 < 2) { +// output.add("iterate"); +// } else { +// output.add("output"); +// } +// return output; +// } +// }); +// +// it2.closeWith(boxing.select("iterate")); +// +// boxing.select("output").print(); +// boxing.select("iterate").print(); +// +// //sourceStream1.project(0).types(Long.class).print(); +// //sourceStream2.groupBy("f0").sum("f1").print(); +// +// env.execute(); +// +// System.out.println(resultList1); +// } + + @Test + public void complexIntegrationTest2() throws Exception { + + StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); + env.setBufferTimeout(10); + + DataStream<Tuple5<Integer, String, Character, Double, Boolean>> sourceStream1 = env.fromCollection(input); + DataStream<OuterPojo> sourceStream2 = env.addSource(new PojoSource()); + + //noinspection unchecked + sourceStream1.groupBy(1).sum(3) + .groupBy(2).window(Time.of(10, new MyTimestamp(), 0)).every(Time.of(4, new MyTimestamp(), 0)).max(3) + .flatten() + .merge(sourceStream1.filter(new MyFilterFunction()).distribute()) + .map(new MapFunction<Tuple5<Integer, String, Character, Double, Boolean>, Tuple4<Integer, String, + Double, Boolean>>() { + + @Override + public Tuple4<Integer, String, Double, Boolean> map(Tuple5<Integer, String, Character, Double, + Boolean> value) throws Exception { + return new Tuple4<Integer, String, Double, Boolean>(value.f0, value.f1 + "-" + value.f2, value + .f3, + value.f4); + } + }).broadcast().flatMap(new MyFlatMapFunction()).connect(sourceStream2).map(new + MyCoMapFunction()).addSink(new ToListSink<String>("test2")); + + env.execute(); + + String result = "water_melon-b, water_melon-b, water_melon-b, water_melon-b, water_melon-b, water_melon-b, " + + "water_melon-b, water_melon-b, water_melon-b, water_melon-b, water_melon-b, water_melon-b, " + + "water_melon-b, " + + "water_melon-b, water_melon-b, water_melon-b, water_melon-b, water_melon-b, water_melon-b, " + + "water_melon-b, " + + "peach-b, peach-b, peach-a, peach-a, peach-a, peach-a, peanut-b, peanut-b, plum-a, plum-a, orange-c, " + + "orange-c, " + + "orange-c, orange-c, apple-b, apple-b"; + + ArrayList<String> test2Results = new ArrayList<String>(Arrays.asList(result.split(", "))); + + Collections.sort(test2Results); + Collections.sort(results.get("test2")); + assertEquals(test2Results, results.get("test2")); + System.out.println(results.get("test2")); + } + + @Test + public void complexIntegrationTest3() throws Exception { + ArrayList<String> expected1 = new ArrayList<String>(); + for (int i = 0; i < 9; i++) { + expected1.add("(10,(a,1))"); + } + ArrayList<String> expected2 = new ArrayList<String>(); + for (int i = 0; i < 19; i++) { + expected2.add("(20,(a,1))"); + } + + StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); + env.setBufferTimeout(10); + + DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream = env.addSource(new TupleSource()); + IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream.sum(0).filter(new FilterFunction + <Tuple2<Long, Tuple2<String, Long>>>() { + + @Override + public boolean filter(Tuple2<Long, Tuple2<String, Long>> value) throws Exception { + return value.f0 < 20; + } + }).iterate(1000); + + SplitDataStream<Tuple2<Long, Tuple2<String, Long>>> step = it.map(new IncrementMap()).split(new + MyOutputSelector()); + it.closeWith(step.select("iterate")); + + step.select("firstOutput").addSink(new ToListSink<Tuple2<Long, Tuple2<String, Long>>>("test31")); + step.select("secondOutput").addSink(new ToListSink<Tuple2<Long, Tuple2<String, Long>>>("test32")); + + env.execute(); + + assertEquals(expected1, results.get("test31")); + assertEquals(expected2, results.get("test32")); + + } + +// non functioning +// @Test +// public void complexIntegrationTest4() throws Exception { +// StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); +// env.setBufferTimeout(10); +// +// DataStream<Tuple5<Integer, String, Character, Double, Boolean>> sourceStream = env.fromCollection(input); +// +// IterativeDataStream<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> it2 = sourceStream.project(3, 1) +// .types(Double.class, String +// .class).forward().map(new MapFunction<Tuple2<Double, String>, Tuple3<List<Tuple2<Double, String>>, +// Integer, Double>>(){ +// +// @Override +// public Tuple3<List<Tuple2<Double, String>>, Integer, Double> map(Tuple2<Double, String> value) throws +// Exception { +// List<Tuple2<Double, String>> list = new ArrayList<Tuple2<Double, String>>(); +// list.add(value); +// return new Tuple3<List<Tuple2<Double, String>>, Integer, Double>(list, 1, value.f0); +// } +// }).iterate(5000); +// +// SplitDataStream<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> boxing = +// it2.window(Count.of(2)) +// .mapWindow(new WindowMapFunction<Tuple3<List<Tuple2<Double, String>>, Integer, Double>, +// Tuple3<List<Tuple2<Double, String>>, Integer, Double>>() { +// +// @Override +// public void mapWindow(Iterable<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> +// values, +// Collector<Tuple3<List<Tuple2<Double, String>>, Integer, Double>> out) throws +// Exception { +// List<Tuple2<Double, String>> list = new ArrayList<Tuple2<Double, String>>(); +// int count = 0; +// double quantity = 0.0; +// +// for (Tuple3<List<Tuple2<Double, String>>, Integer, Double> value : values) { +// list.addAll(value.f0); +// count += value.f1; +// quantity += value.f2; +// } +// out.collect(new Tuple3<List<Tuple2<Double, String>>, Integer, Double>(list, count, +// quantity)); +// +// } +// }) +// .flatten().split(new OutputSelector<Tuple3<List<Tuple2<Double, String>>, Integer, Double>>(){ +// +// @Override +// public Iterable<String> select(Tuple3<List<Tuple2<Double, String>>, Integer, Double> value) { +// List<String> output = new ArrayList<String>(); +// if (value.f2 < 2) { +// output.add("iterate"); +// } else { +// output.add("output"); +// } +// return output; +// } +// }); +// +// it2.closeWith(boxing.select("iterate")); +// +// boxing.select("output").print(); +// +// env.execute(); +// } + + @Test + public void complexIntegrationTest5() throws Exception { + ArrayList<String> expected = new ArrayList<String>(); + for (int i = 0; i < 20; i++) { + expected.add("((" + i / 2 + ",water_melon-b,2,pojo)," + (i % 2 + 1) + ")"); + expected.add("((1,a,1,tuple)," + (i + 1) + ")"); + } + + StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); + env.setBufferTimeout(10); + + DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()); + DataStream<OuterPojo> sourceStream2 = env.addSource(new PojoSource()); + + sourceStream1.shuffle().connect(sourceStream2.broadcast()).map(new StringifyCoMapFunction()).map(new + MapFunction<String, Tuple2<String, Short>>() { + + @Override + public Tuple2<String, Short> map(String value) throws Exception { + return new Tuple2<String, Short>(value, (short) 1); + } + + }).groupBy(0).sum(1).addSink(new ToListSink<Tuple2<String, Short>>("test5")); + + env.execute(); + + Collections.sort(expected); + Collections.sort(results.get("test5")); + + assertEquals(expected, results.get("test5")); + } + + @Test + public void complexIntegrationTest6() throws Exception { + //heavy traffic with maps and flatmaps + + ArrayList<String> expected1 = new ArrayList<String>(); + expected1.add("541"); + expected1.add("1223"); + expected1.add("1987"); + expected1.add("2741"); + expected1.add("10939"); + expected1.add("3571"); + expected1.add("4409"); + expected1.add("11927"); + expected1.add("5279"); + expected1.add("6133"); + expected1.add("12823"); + expected1.add("6997"); + expected1.add("7919"); + expected1.add("13763"); + expected1.add("8831"); + expected1.add("9733"); + expected1.add("14759"); + expected1.add("9973"); + expected1.add("15671"); + expected1.add("16673"); + expected1.add("17659"); + expected1.add("18617"); + expected1.add("19697"); + expected1.add("19997"); + + ArrayList<String> expected2 = new ArrayList<String>(); + for (int i = 2; i < 100; i++) { + expected2.add("(" + i + "," + 20000 / i + ")"); + } + for (int i = 19901; i <= 20000; i++) { + expected2.add("(" + i + "," + 20000 / i + ")"); + } + + StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE); + env.setBufferTimeout(1); + + DataStream<Long> sourceStream1 = env.generateSequence(1, 10000); + DataStream<Long> sourceStream2 = env.generateSequence(10001, 20000); + + //noinspection unchecked + sourceStream1.filter(new PrimeFilterFunction()) + .window(Count.of(100)) + .max(0).flatten() + .merge(sourceStream2.filter(new PrimeFilterFunction()) + .window(Count.of(100)) + .max(0).flatten()) + .addSink(new ToListSink<Long>("test61")); + sourceStream1.flatMap(new DivisorsFlatMapFunction()) + .merge(sourceStream2.flatMap(new DivisorsFlatMapFunction())).map(new MapFunction<Long, Tuple2<Long, + Integer>>() { + + @Override + public Tuple2<Long, Integer> map(Long value) throws Exception { + return new Tuple2<Long, Integer>(value, 1); + } + }) + .groupBy(0) + .sum(1) + .groupBy(0) + .window(Count.of(10000)).max(1).flatten() + .filter(new FilterFunction<Tuple2<Long, Integer>>() { + + @Override + public boolean filter(Tuple2<Long, Integer> value) throws Exception { + return value.f0 < 100 || value.f0 > 19900; + } + }) + .addSink(new ToListSink<Tuple2<Long, Integer>>("test62")); + + env.execute(); + + Collections.sort(expected1); + Collections.sort(expected2); + Collections.sort(results.get("test61")); + Collections.sort(results.get("test62")); + + assertEquals(expected1, results.get("test61")); + assertEquals(expected2, results.get("test62")); + + } + + @Test + public void complexIntegrationTest7() throws Exception { + + ArrayList<String> expected = new ArrayList<String>(); + expected.add("((100,100),0)"); + expected.add("((120,122),5)"); + expected.add("((121,125),6)"); + expected.add("((138,144),9)"); + expected.add("((139,147),10)"); + expected.add("((156,166),13)"); + expected.add("((157,169),14)"); + expected.add("((174,188),17)"); + expected.add("((175,191),18)"); + expected.add("((192,210),21)"); + expected.add("((193,213),22)"); + expected.add("((210,232),25)"); + expected.add("((211,235),26)"); + expected.add("((228,254),29)"); + expected.add("((229,257),30)"); + expected.add("((246,276),33)"); + expected.add("((247,279),34)"); + expected.add("((264,298),37)"); + expected.add("((265,301),38)"); + expected.add("((282,320),41)"); + expected.add("((283,323),42)"); + expected.add("((300,342),45)"); + expected.add("((301,345),46)"); + expected.add("((318,364),49)"); + expected.add("((319,367),50)"); + expected.add("((336,386),53)"); + expected.add("((337,389),54)"); + expected.add("((354,408),57)"); + expected.add("((355,411),58)"); + expected.add("((372,430),61)"); + expected.add("((373,433),62)"); + expected.add("((390,452),65)"); + expected.add("((391,455),66)"); + expected.add("((408,474),69)"); + expected.add("((409,477),70)"); + expected.add("((426,496),73)"); + expected.add("((427,499),74)"); + expected.add("((444,518),77)"); + expected.add("((445,521),78)"); + expected.add("((462,540),81)"); + expected.add("((463,543),82)"); + expected.add("((480,562),85)"); + expected.add("((481,565),86)"); + expected.add("((498,584),89)"); + expected.add("((499,587),90)"); + expected.add("((516,606),93)"); + expected.add("((517,609),94)"); + expected.add("((534,628),97)"); + expected.add("((535,631),98)"); + + StreamExecutionEnvironment env = new TestStreamEnvironment(1, 32); + env.setBufferTimeout(100); + + env.addSource(new RectangleSource()).global() + .map(new RectangleMapFunction()) + .window(Delta.of(0.0, new DeltaFunction<Tuple2<RectangleClass, Integer>>() { + + @Override + public double getDelta(Tuple2<RectangleClass, Integer> oldDataPoint, Tuple2<RectangleClass, + Integer> newDataPoint) { + return (newDataPoint.f0.b - newDataPoint.f0.a) - (oldDataPoint.f0.b - oldDataPoint.f0.a); + } + }, new Tuple2<RectangleClass, Integer>(new RectangleClass(100, 100), 0))) + .mapWindow(new MyWindowMapFunction()) + .flatten() + .addSink(new ToListSink<Tuple2<RectangleClass, Integer>>("test7")); + + env.execute(); + + Collections.sort(expected); + Collections.sort(results.get("test7")); + assertEquals(expected, results.get("test7")); + } + + @Test + public void complexIntegrationTest8() throws Exception { + + ArrayList<String> expected = new ArrayList<String>(); + + StreamExecutionEnvironment env = new TestStreamEnvironment(1, 32); + env.setBufferTimeout(1); + + SplitDataStream<Long> splittedStream1 = env.generateSequence(1, 10) + .map(new MapFunction<Long, Long>() { + + @Override + public Long map(Long value) throws Exception { + return value; + } + }) + .filter(new FilterFunction<Long>() { + + @Override + public boolean filter(Long value) throws Exception { + return true; + } + }) + .flatMap(new SquareFlatMapFunction()) + .split(new OutputSelector<Long>() { + + @Override + public Iterable<String> select(Long value) { + ArrayList<String> output = new ArrayList<String>(); + if(value < 8) { + output.add("first"); + } else { + output.add("second"); + } + return output; + } + }); + + //splittedStream1.select("second").print(); + + SplitDataStream<Long> splittedStream2 = splittedStream1.select("first").split(new OutputSelector<Long>() { + + @Override + public Iterable<String> select(Long value) { + ArrayList<String> output = new ArrayList<String>(); + if(value < 5) { + output.add("third"); + } else { + output.add("fourth"); + } + return output; + } + }); + + DataStream<Long> dataStream3 = splittedStream1.select("second").map(new MapFunction<Long, Long>() { + + @Override + public Long map(Long value) throws Exception { + return value; + } + }); + + dataStream3.print(); + splittedStream2.select("third").print(); + // BUG!!! + dataStream3.merge(splittedStream2.select("third")).print(); + + //splittedStream2.select("fourth").print(); + + //.addSink(new ToListSink<Long>("test8")); + + env.execute(); + + //Collections.sort(expected); + //Collections.sort(results.get("test8")); + //assertEquals(expected, results.get("test8")); + } + + public static class InnerPojo { + public Long f0; --- End diff -- Flink should also support stuff like java collections, date-types etc. We frequently see users on the ML using that stuff. If its easy to integrate into the test, I would suggest using such a type as well. > Add a complex integration test for Streaming API > ------------------------------------------------ > > Key: FLINK-1595 > URL: https://issues.apache.org/jira/browse/FLINK-1595 > Project: Flink > Issue Type: Task > Components: Streaming > Reporter: Gyula Fora > Assignee: Péter Szabó > Labels: Starter > > The streaming tests currently lack a sophisticated integration test that > would test many api features at once. > This should include different merging, partitioning, grouping, aggregation > types, as well as windowing and connected operators. > The results should be tested for correctness. > A test like this would help identifying bugs that are hard to detect by > unit-tests. -- This message was sent by Atlassian JIRA (v6.3.4#6332)