Hi Aljoscha,

What I'm looking for is an operator that joins two streams together, but
keeps the events in timestamp order.

What I was trying to do with the window specification comes down to: for
each event on that stream, I want to call this function with this event and
all of the events that arrived within 5 minutes after it. So conceptually a
sliding window, except that the width is defined in terms of time and the
step in terms of count.

What I will really need to do is probably manage the window myself in
operator state (because in many cases I expect that I will not need the
whole window, so it may be interesting to be able to evaluate that
eagerly), but I think I really need the events to arrive in order.

On 14 April 2016 at 17:04, Aljoscha Krettek <aljos...@apache.org> wrote:

> Hi,
> Flink does not make any guarantees about the order of arriving elements
> except in the case of one-to-one forwarding patterns. That is, only for
> map/flatMap/filter and such operations will the order in which two
> successive operations see their elements be the same.
>
> Could you please describe in prose form what the expected outcome of your
> windowing specification is. We could start from this and try to figure out
> how to make Flink behave as it should.
>
> Cheers,
> Aljoscha
>
> On Thu, 14 Apr 2016 at 16:32 Gary Verhaegen <gary.verhae...@euranova.eu>
> wrote:
>
> > Hi list,
> >
> > I am surprised by the behaviour of the code below. In particular, I am
> > puzzled by the fact that events do not seem to enter the window in order.
> > What am I doing wrong?
> >
> > Here's what I don't understand. This test outputs the following error:
> >
> > java.lang.AssertionError: expected:<[[10 "Join(Left,Right)" {"Left:@t":
> > 10,
> > "Left:attr": 1, "Right:@t": 9, "Right:val": 1}], [15 "Join(Left,Right)"
> > {"Left:@t": 14, "Left:attr": 1, "Right:@t": 15, "Right:val": 1}]]> but
> > was:<[[9 "None" {"times": [9]}], [12 "None" {"times": [12]}], [17 "None"
> > {"times": [17]}], [9 "None" {"times": [9, 15]}], [9 "None" {"times": [9,
> > 15, 10]}], [9 "None" {"times": [9, 15, 10, 14]}]]>
> >
> > Now, the test is not complete, so it's not surprising that it fails, but
> > what really puzzles me is that there appears to be a moment when my
> window
> > contains an event at time 9 and an event at time 15, but does not yet
> > include the events at times 10 and 14, which should be part of the same
> > stream (and are indeed added later).
> >
> > This code uses the 1.0.1 version of flink-java, flink-streaming-java_2.11
> > and flink-clients_2.11 (and junit 4.12), running under Java 8 with the
> > relevant parts of the pom.xml uncommented.
> >
> > package enx.cep;
> > import static org.junit.Assert.assertEquals;
> > import org.apache.flink.streaming.api.TimeCharacteristic;import
> > org.apache.flink.streaming.api.datastream.DataStream;import
> >
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
> >
> org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;import
> > org.apache.flink.streaming.api.functions.windowing.WindowFunction;import
> > org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import
> > org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import
> > org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import
> > org.apache.flink.util.Collector;import org.junit.Test;
> > import java.io.*;import java.net.ServerSocket;import
> > java.net.Socket;import java.net.SocketTimeoutException;import
> > java.util.*;
> > import java.util.function.Function;import
> > java.util.stream.Collectors;import java.util.stream.StreamSupport;
> > public class AlgebraTest {
> >     @Test public void flinkCanJoinTwoStreams() throws Exception {
> >         final List<Msg> inputs = list(
> >                 Msg(9, "Right", "val", 1),
> >                 Msg(10, "Left", "attr", 1),
> >                 Msg(12, "Left", "attr", 2),
> >                 Msg(14, "Left", "attr", 1),
> >                 Msg(15, "Right", "val", 1),
> >                 Msg(17, "Right", "val", 3));
> >         final List<Msg> expected = list(
> >                 Msg(10, "Join(Left,Right)", "Right:val", 1, "Left:attr",
> 1,
> >                         "Right:@t", 9, "Left:@t", 10),
> >                 Msg(15, "Join(Left,Right)", "Right:val", 1, "Left:attr",
> 1,
> >                         "Right:@t", 15, "Left:@t", 14));
> >         final List<Msg> output = runStreamAlg(inputs, source -> {
> >             final DataStream<Msg> RightSource = source.filter(msg ->
> > "Right".equals(msg.type));
> >             final DataStream<Msg> LeftSource = source.filter(msg ->
> > "Left".equals(msg.type));
> >
> >             // Join Left & Right streams on
> >             // Left.attr == Right.val && abs(Left.t - Right.t) < 5
> >             final DataStream<Msg> joined = LeftSource.union(RightSource)
> >                     .keyBy(msg -> {
> >                         if ("Right".equals(msg.type)) {
> >                             return msg.attrs.get("val");
> >                         } else if ("Left".equals(msg.type)) {
> >                             return msg.attrs.get("attr");
> >                         } else {
> >                             throw new RuntimeException();
> >                         }
> >                     })
> >                     .window(GlobalWindows.create())
> >                     .trigger(CountTrigger.of(1))
> >                     .apply(new WindowFunction<Msg, Msg, Object,
> > GlobalWindow>() {
> >                         @Override
> >                         public void apply(Object _key, GlobalWindow
> > _w, Iterable<Msg> ins, Collector<Msg> collector) throws Exception {
> >                             List<Integer> times =
> > StreamSupport.stream(ins.spliterator(), false)
> >                                     .map(m -> m.timestamp)
> >                                     .collect(Collectors.toList());
> >
> > collector.collect(Msg(times.stream().mapToInt(i ->
> > i).min().getAsInt(),
> >                                     "None", "times", times));
> >                         }
> >                     });
> >
> >             return joined;
> >         });
> >         assertEquals(expected, output);
> >     }
> >
> >     private final List<Msg> runStreamAlg(List<Msg> input,
> > Function<DataStream<Msg>, DataStream<Msg>> fn) {
> >         final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >         final DataStream<Msg> source = env.fromCollection(input)
> >                 .assignTimestampsAndWatermarks(new
> > AscendingTimestampExtractor<Msg>() {
> >                     @Override
> >                     public long extractAscendingTimestamp(Msg msg) {
> >                         return msg.timestamp * 1000;
> >                     }
> >                 });
> >         final DataStream<Msg> transformed = fn.apply(source);
> >
> >         final List<Msg> res = new ArrayList<>();
> >         try (final ServerSocket server = new ServerSocket(0)) {
> >             final int serverPort = server.getLocalPort();
> >
> >             transformed.addSink(m -> {
> >                 try (final Socket client = new Socket("localhost",
> > serverPort)) {
> >                     final ObjectOutputStream toServer = new
> > ObjectOutputStream(client.getOutputStream());
> >                     toServer.writeObject(m);
> >                     toServer.flush();
> >                     toServer.close();
> >                 }
> >             });
> >
> >             final Thread t = new Thread(() -> {
> >                 while (true) {
> >                     try (final ObjectInputStream in = new
> > ObjectInputStream(server.accept().getInputStream())) {
> >                         res.add((Msg) in.readObject());
> >                         server.setSoTimeout(500);
> >                     } catch (SocketTimeoutException e) {
> >                         return;
> >                     } catch (java.io.IOException | ClassNotFoundException
> > e) {
> >                         throw new RuntimeException(e);
> >                     }
> >                 }
> >             });
> >             t.start();
> >             try {
> >                 env.execute();
> >             } catch (Exception e) {
> >                 e.printStackTrace();
> >             }
> >             t.join();
> >         } catch (Exception e) {
> >             throw new RuntimeException(e);
> >         }
> >         return res;
> >     }
> >
> >     private static <T> List<T> list(T elem, T... others) {
> >         final List<T> res = new ArrayList<>();
> >         res.add(elem);
> >         for(T t: others) {
> >             res.add(t);
> >         }
> >         return res;
> >     }
> >
> >     private static Msg Msg(int timestamp, String type, Object... attrs) {
> >         return new Msg(timestamp, type, attrs);
> >     }
> >
> >     private static class Msg implements Serializable {
> >         private final String type;
> >         private final int timestamp;
> >         private final Map<String, Object> attrs;
> >         public Msg(int timestamp, String type, Object... attrs) {
> >             this.timestamp = timestamp;
> >             this.type = type;
> >             this.attrs = new HashMap<>();
> >             if (attrs.length % 2 != 0) throw new
> > IllegalArgumentException();
> >             for (int i = 0; i < attrs.length; i += 2) {
> >                 if (!(attrs[i] instanceof String)) throw new
> > IllegalArgumentException();
> >                 this.attrs.put((String) attrs[i], attrs[i+1]);
> >             }
> >         }
> >
> >         public String toString() {
> >             return String.format("[%d \"%s\" {%s}]",
> >                     this.timestamp,
> >                     this.type,
> >                     this.attrs.entrySet().stream()
> >                             .sorted((e1, e2) ->
> > e1.getKey().compareTo(e2.getKey()))
> >                             .map(e -> String.format("\"%s\": %s",
> > e.getKey(), e.getValue()))
> >                             .reduce((acc, el) -> acc + ", " + el)
> >                             .orElseGet(() -> ""));
> >         }
> >     }}
> >
>

Reply via email to