Dear Friends,
         I have 2 streams of the below data types.

DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;

DataStream<Tuple2<String, Double>> unionReloadsStream;

These streams are getting data from Kafka and getting data in different
frequencies. "unionReloadsStream"  will receive more data than
"splittedActivationTuple". I need to store  "splittedActivationTuple" in a
Window of 24 hours and manipulate its "Double" field, if a matching data
comes from unionReloadsStream (String field is the common field).

So I wrote the following method to do this task.


public static DataStream<Tuple3<String, Integer, Double>>
joinActivationsBasedOnReload(
            DataStream<Tuple3<String, Integer, Double>> activationsStream,
            DataStream<Tuple2<String, Double>> unifiedReloadStream) {

        return activationsStream.join(unifiedReloadStream).where(new
ActivationStreamSelector())
                .equalTo(new
ReloadStreamSelector()).window(GlobalWindows.create())
                .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
                .apply(new JoinFunction<Tuple3<String, Integer, Double>,
Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Tuple3<String, Integer, Double>
join(Tuple3<String, Integer, Double> first,
                            Tuple2<String, Double> second) {
                        return new Tuple3<String, Integer,
Double>(first.f0, first.f1, first.f2 + second.f1);
                    }
                });
    }


and calling as,

DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);

activationWindowStream.print();


But I couldn't see anything printing.

I expected "activationWindowStream" to contain the
"splittedActivationTuple" (smaller set) data and the Double value
accumulated if  unionReloadsStream's incoming elements have a matching
"String" field. But that is not happening. Where I am missing?

Thanks,
Rakkesh

Reply via email to