Hi Rakkesh, Did you call `execute()`on your `StreamExecutionEnvironment`?
Best, Xingcan > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <titus.rakk...@gmail.com> wrote: > > Dear Friends, > I have 2 streams of the below data types. > > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple; > > DataStream<Tuple2<String, Double>> unionReloadsStream; > > These streams are getting data from Kafka and getting data in different > frequencies. "unionReloadsStream" will receive more data than > "splittedActivationTuple". I need to store "splittedActivationTuple" in a > Window of 24 hours and manipulate its "Double" field, if a matching data > comes from unionReloadsStream (String field is the common field). > > So I wrote the following method to do this task. > > > public static DataStream<Tuple3<String, Integer, Double>> > joinActivationsBasedOnReload( > DataStream<Tuple3<String, Integer, Double>> activationsStream, > DataStream<Tuple2<String, Double>> unifiedReloadStream) { > > return activationsStream.join(unifiedReloadStream).where(new > ActivationStreamSelector()) > .equalTo(new > ReloadStreamSelector()).window(GlobalWindows.create()) > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS))) > .apply(new JoinFunction<Tuple3<String, Integer, Double>, > Tuple2<String, Double>, Tuple3<String, Integer, Double>>() { > private static final long serialVersionUID = 1L; > @Override > public Tuple3<String, Integer, Double> > join(Tuple3<String, Integer, Double> first, > Tuple2<String, Double> second) { > return new Tuple3<String, Integer, Double>(first.f0, > first.f1, first.f2 + second.f1); > } > }); > } > > > and calling as, > > DataStream<Tuple3<String, Integer, Double>> activationWindowStream = > joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream); > > activationWindowStream.print(); > > > But I couldn't see anything printing. > > I expected "activationWindowStream" to contain the "splittedActivationTuple" > (smaller set) data and the Double value accumulated if unionReloadsStream's > incoming elements have a matching "String" field. But that is not happening. > Where I am missing? > > Thanks, > Rakkesh