Thanks for the reply. I have called "env.execute()". But nothing getting
printed. I have a doubt whether "implemented function" is correct with my
"requirement". Please assist.

On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xingc...@gmail.com> wrote:

> Hi Rakkesh,
>
> Did you call `execute()`on your `StreamExecutionEnvironment`?
>
> Best,
> Xingcan
>
> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <titus.rakk...@gmail.com>
> wrote:
> >
> > Dear Friends,
> >          I have 2 streams of the below data types.
> >
> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
> >
> > DataStream<Tuple2<String, Double>> unionReloadsStream;
> >
> > These streams are getting data from Kafka and getting data in different
> frequencies. "unionReloadsStream"  will receive more data than
> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a
> Window of 24 hours and manipulate its "Double" field, if a matching data
> comes from unionReloadsStream (String field is the common field).
> >
> > So I wrote the following method to do this task.
> >
> >
> > public static DataStream<Tuple3<String, Integer, Double>>
> joinActivationsBasedOnReload(
> >             DataStream<Tuple3<String, Integer, Double>>
> activationsStream,
> >             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
> >
> >         return activationsStream.join(unifiedReloadStream).where(new
> ActivationStreamSelector())
> >                 .equalTo(new ReloadStreamSelector()).
> window(GlobalWindows.create())
> >                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> >                 .apply(new JoinFunction<Tuple3<String, Integer, Double>,
> Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
> >                     private static final long serialVersionUID = 1L;
> >                     @Override
> >                     public Tuple3<String, Integer, Double>
> join(Tuple3<String, Integer, Double> first,
> >                             Tuple2<String, Double> second) {
> >                         return new Tuple3<String, Integer,
> Double>(first.f0, first.f1, first.f2 + second.f1);
> >                     }
> >                 });
> >     }
> >
> >
> > and calling as,
> >
> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
> joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
> >
> > activationWindowStream.print();
> >
> >
> > But I couldn't see anything printing.
> >
> > I expected "activationWindowStream" to contain the
> "splittedActivationTuple" (smaller set) data and the Double value
> accumulated if  unionReloadsStream's incoming elements have a matching
> "String" field. But that is not happening. Where I am missing?
> >
> > Thanks,
> > Rakkesh
>
>

Reply via email to