Hi list, I am surprised by the behaviour of the code below. In particular, I am puzzled by the fact that events do not seem to enter the window in order. What am I doing wrong?
Here's what I don't understand. This test outputs the following error: java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t": 10, "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 "Join(Left,Right)" {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 "None" {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": [9, 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]> Now, the test is not complete, so it's not surprising that it fails, but what really puzzles me is that there appears to be a moment when my window contains an event at time 9 and an event at time 15, but does not yet include the events at times 10 and 14, which should be part of the same stream (and are indeed added later). This code uses the 1.0.1 version of flink-java, flink-streaming-java_2.11 and flink-clients_2.11 (and junit 4.12), running under Java 8 with the relevant parts of the pom.xml uncommented. package enx.cep; import static org.junit.Assert.assertEquals; import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import org.apache.flink.util.Collector;import org.junit.Test; import java.io.*;import java.net.ServerSocket;import java.net.Socket;import java.net.SocketTimeoutException;import java.util.*; import java.util.function.Function;import java.util.stream.Collectors;import java.util.stream.StreamSupport; public class AlgebraTest { @Test public void flinkCanJoinTwoStreams() throws Exception { final List<Msg> inputs = list( Msg(9, "Right", "val", 1), Msg(10, "Left", "attr", 1), Msg(12, "Left", "attr", 2), Msg(14, "Left", "attr", 1), Msg(15, "Right", "val", 1), Msg(17, "Right", "val", 3)); final List<Msg> expected = list( Msg(10, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1, "Right:@t", 9, "Left:@t", 10), Msg(15, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1, "Right:@t", 15, "Left:@t", 14)); final List<Msg> output = runStreamAlg(inputs, source -> { final DataStream<Msg> RightSource = source.filter(msg -> "Right".equals(msg.type)); final DataStream<Msg> LeftSource = source.filter(msg -> "Left".equals(msg.type)); // Join Left & Right streams on // Left.attr == Right.val && abs(Left.t - Right.t) < 5 final DataStream<Msg> joined = LeftSource.union(RightSource) .keyBy(msg -> { if ("Right".equals(msg.type)) { return msg.attrs.get("val"); } else if ("Left".equals(msg.type)) { return msg.attrs.get("attr"); } else { throw new RuntimeException(); } }) .window(GlobalWindows.create()) .trigger(CountTrigger.of(1)) .apply(new WindowFunction<Msg, Msg, Object, GlobalWindow>() { @Override public void apply(Object _key, GlobalWindow _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception { List<Integer> times = StreamSupport.stream(ins.spliterator(), false) .map(m -> m.timestamp) .collect(Collectors.toList()); collector.collect(Msg(times.stream().mapToInt(i -> i).min().getAsInt(), "None", "times", times)); } }); return joined; }); assertEquals(expected, output); } private final List<Msg> runStreamAlg(List<Msg> input, Function<DataStream<Msg>, DataStream<Msg>> fn) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); final DataStream<Msg> source = env.fromCollection(input) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Msg>() { @Override public long extractAscendingTimestamp(Msg msg) { return msg.timestamp * 1000; } }); final DataStream<Msg> transformed = fn.apply(source); final List<Msg> res = new ArrayList<>(); try (final ServerSocket server = new ServerSocket(0)) { final int serverPort = server.getLocalPort(); transformed.addSink(m -> { try (final Socket client = new Socket("localhost", serverPort)) { final ObjectOutputStream toServer = new ObjectOutputStream(client.getOutputStream()); toServer.writeObject(m); toServer.flush(); toServer.close(); } }); final Thread t = new Thread(() -> { while (true) { try (final ObjectInputStream in = new ObjectInputStream(server.accept().getInputStream())) { res.add((Msg) in.readObject()); server.setSoTimeout(500); } catch (SocketTimeoutException e) { return; } catch (java.io.IOException | ClassNotFoundException e) { throw new RuntimeException(e); } } }); t.start(); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } t.join(); } catch (Exception e) { throw new RuntimeException(e); } return res; } private static <T> List<T> list(T elem, T... others) { final List<T> res = new ArrayList<>(); res.add(elem); for(T t: others) { res.add(t); } return res; } private static Msg Msg(int timestamp, String type, Object... attrs) { return new Msg(timestamp, type, attrs); } private static class Msg implements Serializable { private final String type; private final int timestamp; private final Map<String, Object> attrs; public Msg(int timestamp, String type, Object... attrs) { this.timestamp = timestamp; this.type = type; this.attrs = new HashMap<>(); if (attrs.length % 2 != 0) throw new IllegalArgumentException(); for (int i = 0; i < attrs.length; i += 2) { if (!(attrs[i] instanceof String)) throw new IllegalArgumentException(); this.attrs.put((String) attrs[i], attrs[i+1]); } } public String toString() { return String.format("[%d \"%s\" {%s}]", this.timestamp, this.type, this.attrs.entrySet().stream() .sorted((e1, e2) -> e1.getKey().compareTo(e2.getKey())) .map(e -> String.format("\"%s\": %s", e.getKey(), e.getValue())) .reduce((acc, el) -> acc + ", " + el) .orElseGet(() -> "")); } }}