Hi Aljoscha, What I'm looking for is an operator that joins two streams together, but keeps the events in timestamp order.
What I was trying to do with the window specification comes down to: for each event on that stream, I want to call this function with this event and all of the events that arrived within 5 minutes after it. So conceptually a sliding window, except that the width is defined in terms of time and the step in terms of count. What I will really need to do is probably manage the window myself in operator state (because in many cases I expect that I will not need the whole window, so it may be interesting to be able to evaluate that eagerly), but I think I really need the events to arrive in order. On 14 April 2016 at 17:04, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > Flink does not make any guarantees about the order of arriving elements > except in the case of one-to-one forwarding patterns. That is, only for > map/flatMap/filter and such operations will the order in which two > successive operations see their elements be the same. > > Could you please describe in prose form what the expected outcome of your > windowing specification is. We could start from this and try to figure out > how to make Flink behave as it should. > > Cheers, > Aljoscha > > On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <gary.verhae...@euranova.eu> > wrote: > > > Hi list, > > > > I am surprised by the behaviour of the code below. In particular, I am > > puzzled by the fact that events do not seem to enter the window in order. > > What am I doing wrong? > > > > Here's what I don't understand. This test outputs the following error: > > > > java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t": > > 10, > > "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 "Join(Left,Right)" > > {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but > > was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 "None" > > {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": [9, > > 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]> > > > > Now, the test is not complete, so it's not surprising that it fails, but > > what really puzzles me is that there appears to be a moment when my > window > > contains an event at time 9 and an event at time 15, but does not yet > > include the events at times 10 and 14, which should be part of the same > > stream (and are indeed added later). > > > > This code uses the 1.0.1 version of flink-java, flink-streaming-java_2.11 > > and flink-clients_2.11 (and junit 4.12), running under Java 8 with the > > relevant parts of the pom.xml uncommented. > > > > package enx.cep; > > import static org.junit.Assert.assertEquals; > > import org.apache.flink.streaming.api.TimeCharacteristic;import > > org.apache.flink.streaming.api.datastream.DataStream;import > > > > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import > > > org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import > > org.apache.flink.streaming.api.functions.windowing.WindowFunction;import > > org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import > > org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import > > org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import > > org.apache.flink.util.Collector;import org.junit.Test; > > import java.io.*;import java.net.ServerSocket;import > > java.net.Socket;import java.net.SocketTimeoutException;import > > java.util.*; > > import java.util.function.Function;import > > java.util.stream.Collectors;import java.util.stream.StreamSupport; > > public class AlgebraTest { > > @Test public void flinkCanJoinTwoStreams() throws Exception { > > final List<Msg> inputs = list( > > Msg(9, "Right", "val", 1), > > Msg(10, "Left", "attr", 1), > > Msg(12, "Left", "attr", 2), > > Msg(14, "Left", "attr", 1), > > Msg(15, "Right", "val", 1), > > Msg(17, "Right", "val", 3)); > > final List<Msg> expected = list( > > Msg(10, "Join(Left,Right)", "Right:val", 1, "Left:attr", > 1, > > "Right:@t", 9, "Left:@t", 10), > > Msg(15, "Join(Left,Right)", "Right:val", 1, "Left:attr", > 1, > > "Right:@t", 15, "Left:@t", 14)); > > final List<Msg> output = runStreamAlg(inputs, source -> { > > final DataStream<Msg> RightSource = source.filter(msg -> > > "Right".equals(msg.type)); > > final DataStream<Msg> LeftSource = source.filter(msg -> > > "Left".equals(msg.type)); > > > > // Join Left & Right streams on > > // Left.attr == Right.val && abs(Left.t - Right.t) < 5 > > final DataStream<Msg> joined = LeftSource.union(RightSource) > > .keyBy(msg -> { > > if ("Right".equals(msg.type)) { > > return msg.attrs.get("val"); > > } else if ("Left".equals(msg.type)) { > > return msg.attrs.get("attr"); > > } else { > > throw new RuntimeException(); > > } > > }) > > .window(GlobalWindows.create()) > > .trigger(CountTrigger.of(1)) > > .apply(new WindowFunction<Msg, Msg, Object, > > GlobalWindow>() { > > @Override > > public void apply(Object _key, GlobalWindow > > _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception { > > List<Integer> times = > > StreamSupport.stream(ins.spliterator(), false) > > .map(m -> m.timestamp) > > .collect(Collectors.toList()); > > > > collector.collect(Msg(times.stream().mapToInt(i -> > > i).min().getAsInt(), > > "None", "times", times)); > > } > > }); > > > > return joined; > > }); > > assertEquals(expected, output); > > } > > > > private final List<Msg> runStreamAlg(List<Msg> input, > > Function<DataStream<Msg>, DataStream<Msg>> fn) { > > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > final DataStream<Msg> source = env.fromCollection(input) > > .assignTimestampsAndWatermarks(new > > AscendingTimestampExtractor<Msg>() { > > @Override > > public long extractAscendingTimestamp(Msg msg) { > > return msg.timestamp * 1000; > > } > > }); > > final DataStream<Msg> transformed = fn.apply(source); > > > > final List<Msg> res = new ArrayList<>(); > > try (final ServerSocket server = new ServerSocket(0)) { > > final int serverPort = server.getLocalPort(); > > > > transformed.addSink(m -> { > > try (final Socket client = new Socket("localhost", > > serverPort)) { > > final ObjectOutputStream toServer = new > > ObjectOutputStream(client.getOutputStream()); > > toServer.writeObject(m); > > toServer.flush(); > > toServer.close(); > > } > > }); > > > > final Thread t = new Thread(() -> { > > while (true) { > > try (final ObjectInputStream in = new > > ObjectInputStream(server.accept().getInputStream())) { > > res.add((Msg) in.readObject()); > > server.setSoTimeout(500); > > } catch (SocketTimeoutException e) { > > return; > > } catch (java.io.IOException | ClassNotFoundException > > e) { > > throw new RuntimeException(e); > > } > > } > > }); > > t.start(); > > try { > > env.execute(); > > } catch (Exception e) { > > e.printStackTrace(); > > } > > t.join(); > > } catch (Exception e) { > > throw new RuntimeException(e); > > } > > return res; > > } > > > > private static <T> List<T> list(T elem, T... others) { > > final List<T> res = new ArrayList<>(); > > res.add(elem); > > for(T t: others) { > > res.add(t); > > } > > return res; > > } > > > > private static Msg Msg(int timestamp, String type, Object... attrs) { > > return new Msg(timestamp, type, attrs); > > } > > > > private static class Msg implements Serializable { > > private final String type; > > private final int timestamp; > > private final Map<String, Object> attrs; > > public Msg(int timestamp, String type, Object... attrs) { > > this.timestamp = timestamp; > > this.type = type; > > this.attrs = new HashMap<>(); > > if (attrs.length % 2 != 0) throw new > > IllegalArgumentException(); > > for (int i = 0; i < attrs.length; i += 2) { > > if (!(attrs[i] instanceof String)) throw new > > IllegalArgumentException(); > > this.attrs.put((String) attrs[i], attrs[i+1]); > > } > > } > > > > public String toString() { > > return String.format("[%d \"%s\" {%s}]", > > this.timestamp, > > this.type, > > this.attrs.entrySet().stream() > > .sorted((e1, e2) -> > > e1.getKey().compareTo(e2.getKey())) > > .map(e -> String.format("\"%s\": %s", > > e.getKey(), e.getValue())) > > .reduce((acc, el) -> acc + ", " + el) > > .orElseGet(() -> "")); > > } > > }} > > >