Hi Rakkesh,

The `GlobalWindow` is commonly used for custom window assignment and you should 
specify a `trigger` for it [1].
If the built-in window (e.g., tumbling window or sliding window) join in 
DataStream API fails to meet the requirements,
you could try the time-windowed join in Table/SQL API [2].

Hope that helps.

Best,
Xingcan 

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#global-windows>
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#joins>


> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <titus.rakk...@gmail.com> wrote:
> 
> Thanks for the reply. I have called "env.execute()". But nothing getting 
> printed. I have a doubt whether "implemented function" is correct with my 
> "requirement". Please assist. 
> 
> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xingc...@gmail.com 
> <mailto:xingc...@gmail.com>> wrote:
> Hi Rakkesh,
> 
> Did you call `execute()`on your `StreamExecutionEnvironment`?
> 
> Best,
> Xingcan 
> 
> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <titus.rakk...@gmail.com 
> > <mailto:titus.rakk...@gmail.com>> wrote:
> > 
> > Dear Friends,
> >          I have 2 streams of the below data types.
> > 
> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
> > 
> > DataStream<Tuple2<String, Double>> unionReloadsStream;
> > 
> > These streams are getting data from Kafka and getting data in different 
> > frequencies. "unionReloadsStream"  will receive more data than 
> > "splittedActivationTuple". I need to store  "splittedActivationTuple" in a 
> > Window of 24 hours and manipulate its "Double" field, if a matching data 
> > comes from unionReloadsStream (String field is the common field).
> > 
> > So I wrote the following method to do this task.
> > 
> > 
> > public static DataStream<Tuple3<String, Integer, Double>> 
> > joinActivationsBasedOnReload(
> >             DataStream<Tuple3<String, Integer, Double>> activationsStream,
> >             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
> >         
> >         return activationsStream.join(unifiedReloadStream).where(new 
> > ActivationStreamSelector())
> >                 .equalTo(new 
> > ReloadStreamSelector()).window(GlobalWindows.create())
> >                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
> >                 .apply(new JoinFunction<Tuple3<String, Integer, Double>, 
> > Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
> >                     private static final long serialVersionUID = 1L;
> >                     @Override
> >                     public Tuple3<String, Integer, Double> 
> > join(Tuple3<String, Integer, Double> first,
> >                             Tuple2<String, Double> second) {
> >                         return new Tuple3<String, Integer, 
> > Double>(first.f0, first.f1, first.f2 + second.f1);
> >                     }
> >                 });
> >     }
> > 
> > 
> > and calling as,
> > 
> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream = 
> > joinActivationsBasedOnReload(splittedActivationTuple, unionReloadsStream);
> >         
> > activationWindowStream.print();
> > 
> > 
> > But I couldn't see anything printing. 
> > 
> > I expected "activationWindowStream" to contain the 
> > "splittedActivationTuple" (smaller set) data and the Double value 
> > accumulated if  unionReloadsStream's incoming elements have a matching 
> > "String" field. But that is not happening. Where I am missing?
> > 
> > Thanks,
> > Rakkesh
> 
> 

Reply via email to