Thanks for the reply. I have called "env.execute()". But nothing getting printed. I have a doubt whether "implemented function" is correct with my "requirement". Please assist.
On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xingc...@gmail.com> wrote: > Hi Rakkesh, > > Did you call `execute()`on your `StreamExecutionEnvironment`? > > Best, > Xingcan > > > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <titus.rakk...@gmail.com> > wrote: > > > > Dear Friends, > > I have 2 streams of the below data types. > > > > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple; > > > > DataStream<Tuple2<String, Double>> unionReloadsStream; > > > > These streams are getting data from Kafka and getting data in different > frequencies. "unionReloadsStream" will receive more data than > "splittedActivationTuple". I need to store "splittedActivationTuple" in a > Window of 24 hours and manipulate its "Double" field, if a matching data > comes from unionReloadsStream (String field is the common field). > > > > So I wrote the following method to do this task. > > > > > > public static DataStream<Tuple3<String, Integer, Double>> > joinActivationsBasedOnReload( > > DataStream<Tuple3<String, Integer, Double>> > activationsStream, > > DataStream<Tuple2<String, Double>> unifiedReloadStream) { > > > > return activationsStream.join(unifiedReloadStream).where(new > ActivationStreamSelector()) > > .equalTo(new ReloadStreamSelector()). > window(GlobalWindows.create()) > > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS))) > > .apply(new JoinFunction<Tuple3<String, Integer, Double>, > Tuple2<String, Double>, Tuple3<String, Integer, Double>>() { > > private static final long serialVersionUID = 1L; > > @Override > > public Tuple3<String, Integer, Double> > join(Tuple3<String, Integer, Double> first, > > Tuple2<String, Double> second) { > > return new Tuple3<String, Integer, > Double>(first.f0, first.f1, first.f2 + second.f1); > > } > > }); > > } > > > > > > and calling as, > > > > DataStream<Tuple3<String, Integer, Double>> activationWindowStream = > joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream); > > > > activationWindowStream.print(); > > > > > > But I couldn't see anything printing. > > > > I expected "activationWindowStream" to contain the > "splittedActivationTuple" (smaller set) data and the Double value > accumulated if unionReloadsStream's incoming elements have a matching > "String" field. But that is not happening. Where I am missing? > > > > Thanks, > > Rakkesh > >