Hi Rakkesh,

As Xingcan said, the trigger is required by the window to FIRE, you can use
time window (contains a inner trigger) or (ProcessFunction + State + Timer).

Thanks, vino.

2018-07-18 21:44 GMT+08:00 Titus Rakkesh <titus.rakk...@gmail.com>:

> Thanks Xingcan. I specified as GlobalWindow since I am going to put all
> the elements coming with splittedActivationTuple with a 24 hour expiry and
> then do operations on that when elements coming with stream
> "unionReloadsStream" (bigger set).
>
> On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui <xingc...@gmail.com> wrote:
>
>> Hi Rakkesh,
>>
>> The `GlobalWindow` is commonly used for custom window assignment and you
>> should specify a `trigger` for it [1].
>> If the built-in window (e.g., tumbling window or sliding window) join in
>> DataStream API fails to meet the requirements,
>> you could try the time-windowed join in Table/SQL API [2].
>>
>> Hope that helps.
>>
>> Best,
>> Xingcan
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/stream/operators/windows.html#global-windows
>> [2] https://ci.apache.org/projects/flink/flink-docs-master/
>> dev/table/sql.html#joins
>>
>>
>> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <titus.rakk...@gmail.com>
>> wrote:
>>
>> Thanks for the reply. I have called "env.execute()". But nothing getting
>> printed. I have a doubt whether "implemented function" is correct with my
>> "requirement". Please assist.
>>
>> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xingc...@gmail.com> wrote:
>>
>>> Hi Rakkesh,
>>>
>>> Did you call `execute()`on your `StreamExecutionEnvironment`?
>>>
>>> Best,
>>> Xingcan
>>>
>>> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <titus.rakk...@gmail.com>
>>> wrote:
>>> >
>>> > Dear Friends,
>>> >          I have 2 streams of the below data types.
>>> >
>>> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>>> >
>>> > DataStream<Tuple2<String, Double>> unionReloadsStream;
>>> >
>>> > These streams are getting data from Kafka and getting data in
>>> different frequencies. "unionReloadsStream"  will receive more data than
>>> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a
>>> Window of 24 hours and manipulate its "Double" field, if a matching data
>>> comes from unionReloadsStream (String field is the common field).
>>> >
>>> > So I wrote the following method to do this task.
>>> >
>>> >
>>> > public static DataStream<Tuple3<String, Integer, Double>>
>>> joinActivationsBasedOnReload(
>>> >             DataStream<Tuple3<String, Integer, Double>>
>>> activationsStream,
>>> >             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>>> >
>>> >         return activationsStream.join(unifiedReloadStream).where(new
>>> ActivationStreamSelector())
>>> >                 .equalTo(new ReloadStreamSelector()).window
>>> (GlobalWindows.create())
>>> >                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>>> >                 .apply(new JoinFunction<Tuple3<String, Integer,
>>> Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>>> >                     private static final long serialVersionUID = 1L;
>>> >                     @Override
>>> >                     public Tuple3<String, Integer, Double>
>>> join(Tuple3<String, Integer, Double> first,
>>> >                             Tuple2<String, Double> second) {
>>> >                         return new Tuple3<String, Integer,
>>> Double>(first.f0, first.f1, first.f2 + second.f1);
>>> >                     }
>>> >                 });
>>> >     }
>>> >
>>> >
>>> > and calling as,
>>> >
>>> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
>>> joinActivationsBasedOnReload(splittedActivationTuple,
>>> unionReloadsStream);
>>> >
>>> > activationWindowStream.print();
>>> >
>>> >
>>> > But I couldn't see anything printing.
>>> >
>>> > I expected "activationWindowStream" to contain the
>>> "splittedActivationTuple" (smaller set) data and the Double value
>>> accumulated if  unionReloadsStream's incoming elements have a matching
>>> "String" field. But that is not happening. Where I am missing?
>>> >
>>> > Thanks,
>>> > Rakkesh
>>>
>>>
>>
>>
>

Reply via email to