Hi Rakkesh, As Xingcan said, the trigger is required by the window to FIRE, you can use time window (contains a inner trigger) or (ProcessFunction + State + Timer).
Thanks, vino. 2018-07-18 21:44 GMT+08:00 Titus Rakkesh <titus.rakk...@gmail.com>: > Thanks Xingcan. I specified as GlobalWindow since I am going to put all > the elements coming with splittedActivationTuple with a 24 hour expiry and > then do operations on that when elements coming with stream > "unionReloadsStream" (bigger set). > > On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui <xingc...@gmail.com> wrote: > >> Hi Rakkesh, >> >> The `GlobalWindow` is commonly used for custom window assignment and you >> should specify a `trigger` for it [1]. >> If the built-in window (e.g., tumbling window or sliding window) join in >> DataStream API fails to meet the requirements, >> you could try the time-windowed join in Table/SQL API [2]. >> >> Hope that helps. >> >> Best, >> Xingcan >> >> [1] https://ci.apache.org/projects/flink/flink-docs-master/ >> dev/stream/operators/windows.html#global-windows >> [2] https://ci.apache.org/projects/flink/flink-docs-master/ >> dev/table/sql.html#joins >> >> >> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <titus.rakk...@gmail.com> >> wrote: >> >> Thanks for the reply. I have called "env.execute()". But nothing getting >> printed. I have a doubt whether "implemented function" is correct with my >> "requirement". Please assist. >> >> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xingc...@gmail.com> wrote: >> >>> Hi Rakkesh, >>> >>> Did you call `execute()`on your `StreamExecutionEnvironment`? >>> >>> Best, >>> Xingcan >>> >>> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <titus.rakk...@gmail.com> >>> wrote: >>> > >>> > Dear Friends, >>> > I have 2 streams of the below data types. >>> > >>> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple; >>> > >>> > DataStream<Tuple2<String, Double>> unionReloadsStream; >>> > >>> > These streams are getting data from Kafka and getting data in >>> different frequencies. "unionReloadsStream" will receive more data than >>> "splittedActivationTuple". I need to store "splittedActivationTuple" in a >>> Window of 24 hours and manipulate its "Double" field, if a matching data >>> comes from unionReloadsStream (String field is the common field). >>> > >>> > So I wrote the following method to do this task. >>> > >>> > >>> > public static DataStream<Tuple3<String, Integer, Double>> >>> joinActivationsBasedOnReload( >>> > DataStream<Tuple3<String, Integer, Double>> >>> activationsStream, >>> > DataStream<Tuple2<String, Double>> unifiedReloadStream) { >>> > >>> > return activationsStream.join(unifiedReloadStream).where(new >>> ActivationStreamSelector()) >>> > .equalTo(new ReloadStreamSelector()).window >>> (GlobalWindows.create()) >>> > .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS))) >>> > .apply(new JoinFunction<Tuple3<String, Integer, >>> Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() { >>> > private static final long serialVersionUID = 1L; >>> > @Override >>> > public Tuple3<String, Integer, Double> >>> join(Tuple3<String, Integer, Double> first, >>> > Tuple2<String, Double> second) { >>> > return new Tuple3<String, Integer, >>> Double>(first.f0, first.f1, first.f2 + second.f1); >>> > } >>> > }); >>> > } >>> > >>> > >>> > and calling as, >>> > >>> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream = >>> joinActivationsBasedOnReload(splittedActivationTuple, >>> unionReloadsStream); >>> > >>> > activationWindowStream.print(); >>> > >>> > >>> > But I couldn't see anything printing. >>> > >>> > I expected "activationWindowStream" to contain the >>> "splittedActivationTuple" (smaller set) data and the Double value >>> accumulated if unionReloadsStream's incoming elements have a matching >>> "String" field. But that is not happening. Where I am missing? >>> > >>> > Thanks, >>> > Rakkesh >>> >>> >> >> >