Dear Friends, I have 2 streams of the below data types. DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
DataStream<Tuple2<String, Double>> unionReloadsStream; These streams are getting data from Kafka and getting data in different frequencies. "unionReloadsStream" will receive more data than "splittedActivationTuple". I need to store "splittedActivationTuple" in a Window of 24 hours and manipulate its "Double" field, if a matching data comes from unionReloadsStream (String field is the common field). So I wrote the following method to do this task. public static DataStream<Tuple3<String, Integer, Double>> joinActivationsBasedOnReload( DataStream<Tuple3<String, Integer, Double>> activationsStream, DataStream<Tuple2<String, Double>> unifiedReloadStream) { return activationsStream.join(unifiedReloadStream).where(new ActivationStreamSelector()) .equalTo(new ReloadStreamSelector()).window(GlobalWindows.create()) .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS))) .apply(new JoinFunction<Tuple3<String, Integer, Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() { private static final long serialVersionUID = 1L; @Override public Tuple3<String, Integer, Double> join(Tuple3<String, Integer, Double> first, Tuple2<String, Double> second) { return new Tuple3<String, Integer, Double>(first.f0, first.f1, first.f2 + second.f1); } }); } and calling as, DataStream<Tuple3<String, Integer, Double>> activationWindowStream = joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream); activationWindowStream.print(); But I couldn't see anything printing. I expected "activationWindowStream" to contain the "splittedActivationTuple" (smaller set) data and the Double value accumulated if unionReloadsStream's incoming elements have a matching "String" field. But that is not happening. Where I am missing? Thanks, Rakkesh