Hi :)

The execution of the Connected functions (map1/map2 in this case) are not
affected by the timestamps. In other words it is pretty much arbitrary
which input arrives at the CoMapFunction first.

So I think you did everything correctly.

Gyula

Theodore Vasiloudis <theodoros.vasilou...@gmail.com> ezt írta (időpont:
2016. nov. 21., H, 12:07):

> Hello all,
>
> I was playing around with the the IncrementalLearningSkeleton example and
> I had a couple of questions regarding the behavior of connected streams.
>
> In the example the elements are assigned timestamps, and there is a
> stream, model, that produces
> Double[] elements by ingesting and processing a stream of training Integer
> data points.
>
> DataStream<Double[]> model = trainingData
>                 .assignTimestampsAndWatermarks(new LinearTimestamp())
>                 .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
>                 .apply(new PartialModelBuilder());
>
> The model stream is then connected onto a newData stream which allows us
> to use the
> constantly updated model stream to make predictions for the incoming
> stream of newData,
> by having a model variable shared between the two map functions in the
> coMap class.
> The shared model var is updated every time an element from the model
> stream arrives (starts
> out as null)
>
> DataStream<Integer> prediction = newData.connect(model).map(new Predictor
> ());
>
> My confusion comes when I tried a slightly different approach [2], without
> using timestamps
> or watermarks. In my example I simply create countWindows of say 100
> elements,
> and I use readTextFile to read in the trainingData and newData :
>
> DataStream<ArrayList<Double>> model = trainingData
>         .countWindowAll(100)
>         .apply(new PartialModelBuilder());
>
> When I then connect the model stream to the newData stream, the map1
> function of the
> comap never sees the model as not null, as it seems that the map functions
> are executed
> in order: first the map1 function is executed for all the newData
> elements, then the map2
> function is executed for all the model elements.
>
> So how does having or not having timestamps affect the behavior of the
> connected stream?
>
> How would I handle such a case if the notion of timestamps does not apply
> for my data?
> (i.e. here I'm interested in streaming historical data, I assume their
> order does not matter)
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
>
> [2] https://gist.github.com/thvasilo/67bcb9370b03971f380ae43c4ae6e2d0
>

Reply via email to