Thanks for the clarification Gyula!

In that case, is it possible currently to make one of the two connected
streams stall until the other stream has produced at least one output
before it starts producing as well?


On Mon, Nov 21, 2016 at 3:16 PM, Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi :)
>
> The execution of the Connected functions (map1/map2 in this case) are not
> affected by the timestamps. In other words it is pretty much arbitrary
> which input arrives at the CoMapFunction first.
>
> So I think you did everything correctly.
>
> Gyula
>
> Theodore Vasiloudis <theodoros.vasilou...@gmail.com> ezt írta (időpont:
> 2016. nov. 21., H, 12:07):
>
>> Hello all,
>>
>> I was playing around with the the IncrementalLearningSkeleton example and
>> I had a couple of questions regarding the behavior of connected streams.
>>
>> In the example the elements are assigned timestamps, and there is a
>> stream, model, that produces
>> Double[] elements by ingesting and processing a stream of training
>> Integer data points.
>>
>> DataStream<Double[]> model = trainingData
>>                 .assignTimestampsAndWatermarks(new LinearTimestamp())
>>                 .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
>>                 .apply(new PartialModelBuilder());
>>
>> The model stream is then connected onto a newData stream which allows us
>> to use the
>> constantly updated model stream to make predictions for the incoming
>> stream of newData,
>> by having a model variable shared between the two map functions in the
>> coMap class.
>> The shared model var is updated every time an element from the model
>> stream arrives (starts
>> out as null)
>>
>> DataStream<Integer> prediction = newData.connect(model).map(new Predictor
>> ());
>>
>> My confusion comes when I tried a slightly different approach [2],
>> without using timestamps
>> or watermarks. In my example I simply create countWindows of say 100
>> elements,
>> and I use readTextFile to read in the trainingData and newData :
>>
>> DataStream<ArrayList<Double>> model = trainingData
>>         .countWindowAll(100)
>>         .apply(new PartialModelBuilder());
>>
>> When I then connect the model stream to the newData stream, the map1
>> function of the
>> comap never sees the model as not null, as it seems that the map
>> functions are executed
>> in order: first the map1 function is executed for all the newData
>> elements, then the map2
>> function is executed for all the model elements.
>>
>> So how does having or not having timestamps affect the behavior of the
>> connected stream?
>>
>> How would I handle such a case if the notion of timestamps does not apply
>> for my data?
>> (i.e. here I'm interested in streaming historical data, I assume their
>> order does not matter)
>>
>>
>> [1] https://github.com/apache/flink/blob/master/flink-
>> examples/flink-examples-streaming/src/main/java/org/
>> apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
>>
>> [2] https://gist.github.com/thvasilo/67bcb9370b03971f380ae43c4ae6e2d0
>>
>

Reply via email to