Hi Rakkesh, The `GlobalWindow` is commonly used for custom window assignment and you should specify a `trigger` for it [1]. If the built-in window (e.g., tumbling window or sliding window) join in DataStream API fails to meet the requirements, you could try the time-windowed join in Table/SQL API [2].
Hope that helps. Best, Xingcan [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows> [2] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins> > On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <titus.rakk...@gmail.com> wrote: > > Thanks for the reply. I have called "env.execute()". But nothing getting > printed. I have a doubt whether "implemented function" is correct with my > "requirement". Please assist. > > On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xingc...@gmail.com > <mailto:xingc...@gmail.com>> wrote: > Hi Rakkesh, > > Did you call `execute()`on your `StreamExecutionEnvironment`? > > Best, > Xingcan > > > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <titus.rakk...@gmail.com > > <mailto:titus.rakk...@gmail.com>> wrote: > > > > Dear Friends, > > I have 2 streams of the below data types. > > > > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple; > > > > DataStream<Tuple2<String, Double>> unionReloadsStream; > > > > These streams are getting data from Kafka and getting data in different > > frequencies. "unionReloadsStream" will receive more data than > > "splittedActivationTuple". I need to store "splittedActivationTuple" in a > > Window of 24 hours and manipulate its "Double" field, if a matching data > > comes from unionReloadsStream (String field is the common field). > > > > So I wrote the following method to do this task. > > > > > > public static DataStream<Tuple3<String, Integer, Double>> > > joinActivationsBasedOnReload( > > DataStream<Tuple3<String, Integer, Double>> activationsStream, > > DataStream<Tuple2<String, Double>> unifiedReloadStream) { > > > > return activationsStream.join(unifiedReloadStream).where(new > > ActivationStreamSelector()) > > .equalTo(new > > ReloadStreamSelector()).window(GlobalWindows.create()) > > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS))) > > .apply(new JoinFunction<Tuple3<String, Integer, Double>, > > Tuple2<String, Double>, Tuple3<String, Integer, Double>>() { > > private static final long serialVersionUID = 1L; > > @Override > > public Tuple3<String, Integer, Double> > > join(Tuple3<String, Integer, Double> first, > > Tuple2<String, Double> second) { > > return new Tuple3<String, Integer, > > Double>(first.f0, first.f1, first.f2 + second.f1); > > } > > }); > > } > > > > > > and calling as, > > > > DataStream<Tuple3<String, Integer, Double>> activationWindowStream = > > joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream); > > > > activationWindowStream.print(); > > > > > > But I couldn't see anything printing. > > > > I expected "activationWindowStream" to contain the > > "splittedActivationTuple" (smaller set) data and the Double value > > accumulated if unionReloadsStream's incoming elements have a matching > > "String" field. But that is not happening. Where I am missing? > > > > Thanks, > > Rakkesh > >