Hi :) The execution of the Connected functions (map1/map2 in this case) are not affected by the timestamps. In other words it is pretty much arbitrary which input arrives at the CoMapFunction first.
So I think you did everything correctly. Gyula Theodore Vasiloudis <theodoros.vasilou...@gmail.com> ezt írta (időpont: 2016. nov. 21., H, 12:07): > Hello all, > > I was playing around with the the IncrementalLearningSkeleton example and > I had a couple of questions regarding the behavior of connected streams. > > In the example the elements are assigned timestamps, and there is a > stream, model, that produces > Double[] elements by ingesting and processing a stream of training Integer > data points. > > DataStream<Double[]> model = trainingData > .assignTimestampsAndWatermarks(new LinearTimestamp()) > .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS)) > .apply(new PartialModelBuilder()); > > The model stream is then connected onto a newData stream which allows us > to use the > constantly updated model stream to make predictions for the incoming > stream of newData, > by having a model variable shared between the two map functions in the > coMap class. > The shared model var is updated every time an element from the model > stream arrives (starts > out as null) > > DataStream<Integer> prediction = newData.connect(model).map(new Predictor > ()); > > My confusion comes when I tried a slightly different approach [2], without > using timestamps > or watermarks. In my example I simply create countWindows of say 100 > elements, > and I use readTextFile to read in the trainingData and newData : > > DataStream<ArrayList<Double>> model = trainingData > .countWindowAll(100) > .apply(new PartialModelBuilder()); > > When I then connect the model stream to the newData stream, the map1 > function of the > comap never sees the model as not null, as it seems that the map functions > are executed > in order: first the map1 function is executed for all the newData > elements, then the map2 > function is executed for all the model elements. > > So how does having or not having timestamps affect the behavior of the > connected stream? > > How would I handle such a case if the notion of timestamps does not apply > for my data? > (i.e. here I'm interested in streaming historical data, I assume their > order does not matter) > > > [1] > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java > > [2] https://gist.github.com/thvasilo/67bcb9370b03971f380ae43c4ae6e2d0 >