Hi,
Flink does not make any guarantees about the order of arriving elements
except in the case of one-to-one forwarding patterns. That is, only for
map/flatMap/filter and such operations will the order in which two
successive operations see their elements be the same.

Could you please describe in prose form what the expected outcome of your
windowing specification is. We could start from this and try to figure out
how to make Flink behave as it should.

Cheers,
Aljoscha

On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <gary.verhae...@euranova.eu>
wrote:

> Hi list,
>
> I am surprised by the behaviour of the code below. In particular, I am
> puzzled by the fact that events do not seem to enter the window in order.
> What am I doing wrong?
>
> Here's what I don't understand. This test outputs the following error:
>
> java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t":
> 10,
> "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 "Join(Left,Right)"
> {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but
> was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 "None"
> {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": [9,
> 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]>
>
> Now, the test is not complete, so it's not surprising that it fails, but
> what really puzzles me is that there appears to be a moment when my window
> contains an event at time 9 and an event at time 15, but does not yet
> include the events at times 10 and 14, which should be part of the same
> stream (and are indeed added later).
>
> This code uses the 1.0.1 version of flink-java, flink-streaming-java_2.11
> and flink-clients_2.11 (and junit 4.12), running under Java 8 with the
> relevant parts of the pom.xml uncommented.
>
> package enx.cep;
> import static org.junit.Assert.assertEquals;
> import org.apache.flink.streaming.api.TimeCharacteristic;import
> org.apache.flink.streaming.api.datastream.DataStream;import
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
> org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import
> org.apache.flink.streaming.api.functions.windowing.WindowFunction;import
> org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import
> org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import
> org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import
> org.apache.flink.util.Collector;import org.junit.Test;
> import java.io.*;import java.net.ServerSocket;import
> java.net.Socket;import java.net.SocketTimeoutException;import
> java.util.*;
> import java.util.function.Function;import
> java.util.stream.Collectors;import java.util.stream.StreamSupport;
> public class AlgebraTest {
>     @Test public void flinkCanJoinTwoStreams() throws Exception {
>         final List<Msg> inputs = list(
>                 Msg(9, "Right", "val", 1),
>                 Msg(10, "Left", "attr", 1),
>                 Msg(12, "Left", "attr", 2),
>                 Msg(14, "Left", "attr", 1),
>                 Msg(15, "Right", "val", 1),
>                 Msg(17, "Right", "val", 3));
>         final List<Msg> expected = list(
>                 Msg(10, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1,
>                         "Right:@t", 9, "Left:@t", 10),
>                 Msg(15, "Join(Left,Right)", "Right:val", 1, "Left:attr", 1,
>                         "Right:@t", 15, "Left:@t", 14));
>         final List<Msg> output = runStreamAlg(inputs, source -> {
>             final DataStream<Msg> RightSource = source.filter(msg ->
> "Right".equals(msg.type));
>             final DataStream<Msg> LeftSource = source.filter(msg ->
> "Left".equals(msg.type));
>
>             // Join Left & Right streams on
>             // Left.attr == Right.val && abs(Left.t - Right.t) < 5
>             final DataStream<Msg> joined = LeftSource.union(RightSource)
>                     .keyBy(msg -> {
>                         if ("Right".equals(msg.type)) {
>                             return msg.attrs.get("val");
>                         } else if ("Left".equals(msg.type)) {
>                             return msg.attrs.get("attr");
>                         } else {
>                             throw new RuntimeException();
>                         }
>                     })
>                     .window(GlobalWindows.create())
>                     .trigger(CountTrigger.of(1))
>                     .apply(new WindowFunction<Msg, Msg, Object,
> GlobalWindow>() {
>                         @Override
>                         public void apply(Object _key, GlobalWindow
> _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception {
>                             List<Integer> times =
> StreamSupport.stream(ins.spliterator(), false)
>                                     .map(m -> m.timestamp)
>                                     .collect(Collectors.toList());
>
> collector.collect(Msg(times.stream().mapToInt(i ->
> i).min().getAsInt(),
>                                     "None", "times", times));
>                         }
>                     });
>
>             return joined;
>         });
>         assertEquals(expected, output);
>     }
>
>     private final List<Msg> runStreamAlg(List<Msg> input,
> Function<DataStream<Msg>, DataStream<Msg>> fn) {
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         final DataStream<Msg> source = env.fromCollection(input)
>                 .assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<Msg>() {
>                     @Override
>                     public long extractAscendingTimestamp(Msg msg) {
>                         return msg.timestamp * 1000;
>                     }
>                 });
>         final DataStream<Msg> transformed = fn.apply(source);
>
>         final List<Msg> res = new ArrayList<>();
>         try (final ServerSocket server = new ServerSocket(0)) {
>             final int serverPort = server.getLocalPort();
>
>             transformed.addSink(m -> {
>                 try (final Socket client = new Socket("localhost",
> serverPort)) {
>                     final ObjectOutputStream toServer = new
> ObjectOutputStream(client.getOutputStream());
>                     toServer.writeObject(m);
>                     toServer.flush();
>                     toServer.close();
>                 }
>             });
>
>             final Thread t = new Thread(() -> {
>                 while (true) {
>                     try (final ObjectInputStream in = new
> ObjectInputStream(server.accept().getInputStream())) {
>                         res.add((Msg) in.readObject());
>                         server.setSoTimeout(500);
>                     } catch (SocketTimeoutException e) {
>                         return;
>                     } catch (java.io.IOException | ClassNotFoundException
> e) {
>                         throw new RuntimeException(e);
>                     }
>                 }
>             });
>             t.start();
>             try {
>                 env.execute();
>             } catch (Exception e) {
>                 e.printStackTrace();
>             }
>             t.join();
>         } catch (Exception e) {
>             throw new RuntimeException(e);
>         }
>         return res;
>     }
>
>     private static <T> List<T> list(T elem, T... others) {
>         final List<T> res = new ArrayList<>();
>         res.add(elem);
>         for(T t: others) {
>             res.add(t);
>         }
>         return res;
>     }
>
>     private static Msg Msg(int timestamp, String type, Object... attrs) {
>         return new Msg(timestamp, type, attrs);
>     }
>
>     private static class Msg implements Serializable {
>         private final String type;
>         private final int timestamp;
>         private final Map<String, Object> attrs;
>         public Msg(int timestamp, String type, Object... attrs) {
>             this.timestamp = timestamp;
>             this.type = type;
>             this.attrs = new HashMap<>();
>             if (attrs.length % 2 != 0) throw new
> IllegalArgumentException();
>             for (int i = 0; i < attrs.length; i += 2) {
>                 if (!(attrs[i] instanceof String)) throw new
> IllegalArgumentException();
>                 this.attrs.put((String) attrs[i], attrs[i+1]);
>             }
>         }
>
>         public String toString() {
>             return String.format("[%d \"%s\" {%s}]",
>                     this.timestamp,
>                     this.type,
>                     this.attrs.entrySet().stream()
>                             .sorted((e1, e2) ->
> e1.getKey().compareTo(e2.getKey()))
>                             .map(e -> String.format("\"%s\": %s",
> e.getKey(), e.getValue()))
>                             .reduce((acc, el) -> acc + ", " + el)
>                             .orElseGet(() -> ""));
>         }
>     }}
>

Reply via email to