Dear Friends,
I have 2 streams of the below data types.
DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
DataStream<Tuple2<String, Double>> unionReloadsStream;
These streams are getting data from Kafka and getting data in different
frequencies. "unionReloadsStream" will receive more data than
"splittedActivationTuple". I need to store "splittedActivationTuple" in a
Window of 24 hours and manipulate its "Double" field, if a matching data
comes from unionReloadsStream (String field is the common field).
So I wrote the following method to do this task.
public static DataStream<Tuple3<String, Integer, Double>>
joinActivationsBasedOnReload(
DataStream<Tuple3<String, Integer, Double>> activationsStream,
DataStream<Tuple2<String, Double>> unifiedReloadStream) {
return activationsStream.join(unifiedReloadStream).where(new
ActivationStreamSelector())
.equalTo(new
ReloadStreamSelector()).window(GlobalWindows.create())
.evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
.apply(new JoinFunction<Tuple3<String, Integer, Double>,
Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple3<String, Integer, Double>
join(Tuple3<String, Integer, Double> first,
Tuple2<String, Double> second) {
return new Tuple3<String, Integer,
Double>(first.f0, first.f1, first.f2 + second.f1);
}
});
}
and calling as,
DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
activationWindowStream.print();
But I couldn't see anything printing.
I expected "activationWindowStream" to contain the
"splittedActivationTuple" (smaller set) data and the Double value
accumulated if unionReloadsStream's incoming elements have a matching
"String" field. But that is not happening. Where I am missing?
Thanks,
Rakkesh