Thanks Xingcan. I specified as GlobalWindow since I am going to put all the elements coming with splittedActivationTuple with a 24 hour expiry and then do operations on that when elements coming with stream "unionReloadsStream" (bigger set).
On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui <[email protected]> wrote: > Hi Rakkesh, > > The `GlobalWindow` is commonly used for custom window assignment and you > should specify a `trigger` for it [1]. > If the built-in window (e.g., tumbling window or sliding window) join in > DataStream API fails to meet the requirements, > you could try the time-windowed join in Table/SQL API [2]. > > Hope that helps. > > Best, > Xingcan > > [1] https://ci.apache.org/projects/flink/flink-docs- > master/dev/stream/operators/windows.html#global-windows > [2] https://ci.apache.org/projects/flink/flink-docs- > master/dev/table/sql.html#joins > > > On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <[email protected]> > wrote: > > Thanks for the reply. I have called "env.execute()". But nothing getting > printed. I have a doubt whether "implemented function" is correct with my > "requirement". Please assist. > > On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <[email protected]> wrote: > >> Hi Rakkesh, >> >> Did you call `execute()`on your `StreamExecutionEnvironment`? >> >> Best, >> Xingcan >> >> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <[email protected]> >> wrote: >> > >> > Dear Friends, >> > I have 2 streams of the below data types. >> > >> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple; >> > >> > DataStream<Tuple2<String, Double>> unionReloadsStream; >> > >> > These streams are getting data from Kafka and getting data in different >> frequencies. "unionReloadsStream" will receive more data than >> "splittedActivationTuple". I need to store "splittedActivationTuple" in a >> Window of 24 hours and manipulate its "Double" field, if a matching data >> comes from unionReloadsStream (String field is the common field). >> > >> > So I wrote the following method to do this task. >> > >> > >> > public static DataStream<Tuple3<String, Integer, Double>> >> joinActivationsBasedOnReload( >> > DataStream<Tuple3<String, Integer, Double>> >> activationsStream, >> > DataStream<Tuple2<String, Double>> unifiedReloadStream) { >> > >> > return activationsStream.join(unifiedReloadStream).where(new >> ActivationStreamSelector()) >> > .equalTo(new ReloadStreamSelector()).window >> (GlobalWindows.create()) >> > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS))) >> > .apply(new JoinFunction<Tuple3<String, Integer, >> Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() { >> > private static final long serialVersionUID = 1L; >> > @Override >> > public Tuple3<String, Integer, Double> >> join(Tuple3<String, Integer, Double> first, >> > Tuple2<String, Double> second) { >> > return new Tuple3<String, Integer, >> Double>(first.f0, first.f1, first.f2 + second.f1); >> > } >> > }); >> > } >> > >> > >> > and calling as, >> > >> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream = >> joinActivationsBasedOnReload(splittedActivationTuple, >> unionReloadsStream); >> > >> > activationWindowStream.print(); >> > >> > >> > But I couldn't see anything printing. >> > >> > I expected "activationWindowStream" to contain the >> "splittedActivationTuple" (smaller set) data and the Double value >> accumulated if unionReloadsStream's incoming elements have a matching >> "String" field. But that is not happening. Where I am missing? >> > >> > Thanks, >> > Rakkesh >> >> > >
