I meant "Do you recommend the state to be maintained in* Value** State *or
external store like elastic?"

On Thu, May 6, 2021 at 8:46 PM Swagat Mishra <swaga...@gmail.com> wrote:

> I want to aggregate the user activity e.g number of products the user has
> purchased in the last 1 hour.
>
> so - User A (ID = USER-A)  purchases a1 product at 10:30 and another
> product at 10:45 AM and another product at 1:30 AM.
>
> My API should give 2 products purchased if the API call happens at 11:29
> AM (10:30 , 10:45) and 1 product if the API call happens at 1:45 AM
>
> The API will access data persisted from the flink streaming output.
>
> As of now I am doing keyby on (ID = USER-A) .
>
> Do I have to maintain my own own calculated state within the process
> window function. Is the process window function shared across all keys or
> one instance per key.  Do you recommend the state to be maintained in State
> or elastic?
>
> Also, if I change the processing to processing time instead of event time,
> the aggregation is happening. Any reason why flink could not provide event
> time aggregations like the processing time aggregation.
>
>
>
> On Thu, May 6, 2021 at 7:11 PM Arvid Heise <ar...@apache.org> wrote:
>
>> I'm not sure what you want to achieve exactly.
>>
>> You can always keyby the values by a constant pseudo-key such that all
>> values will be in the same partition (so instead of using global but with
>> the same effect). Then you can use a process function to maintain the
>> state. Just make sure that your data volume is low enough as this part is
>> not parallelizable by definition.
>>
>> On Thu, May 6, 2021 at 10:09 AM Swagat Mishra <swaga...@gmail.com> wrote:
>>
>>> thank you
>>>
>>> i wil have a look at datasteeam.global
>>>
>>> is there any other way to maintain state like by using valuestate.
>>>
>>>
>>> On Thu, 6 May 2021 at 1:26 PM, Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> If you keyby then all direct functions see only the elements with the
>>>> same key. So that's the expected behavior and the base of Flink's parallel
>>>> processing capabilities.
>>>>
>>>> If you want to generate a window over all customers, you have to use a
>>>> global window. However, that also means that no parallelization can happen,
>>>> so I'd discourage that.
>>>>
>>>> A better way would be to perform as many calculations as possible in
>>>> the process function (for example create a customer with buy information
>>>> record) and then have a DataStream#global() reshuffle to collect all
>>>> aggregated information on one node.
>>>>
>>>> On Thu, May 6, 2021 at 9:20 AM Swagat Mishra <swaga...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thank you.
>>>>>
>>>>> sourceContext.collectWithTimestamp(c, c.getEventTime());
>>>>>
>>>>> Adding this to the source context worked.
>>>>>
>>>>> However I am still getting only one customer in the process method. i 
>>>>> would expect the iterable to provide all customers in the window. or do i 
>>>>> have to maintain state.
>>>>>
>>>>>
>>>>> changes for reference:
>>>>>
>>>>> I made the following change, also removed anly lag that i had introduced 
>>>>> for experimentation earlier.
>>>>>
>>>>> so trigger looks like:
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public TriggerResult onElement(Customer customer, long l, TimeWindow 
>>>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>>>>         if (timeWindow.maxTimestamp() <= 
>>>>> triggerContext.getCurrentWatermark()) {
>>>>>             // if the watermark is already past the window fire 
>>>>> immediately
>>>>>             return TriggerResult.FIRE;
>>>>>         } else {
>>>>>             //LOGGER.info("Max timestamp for customer: " + 
>>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>>>>             
>>>>> triggerContext.registerEventTimeTimer(customer.getEventTime());           
>>>>>          return TriggerResult.FIRE;
>>>>>
>>>>>         }
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>>>>> TriggerContext triggerContext) {
>>>>>         return time == timeWindow.maxTimestamp() ?
>>>>>                 TriggerResult.FIRE :
>>>>>                 TriggerResult.CONTINUE;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public TriggerResult onProcessingTime(long time, TimeWindow window, 
>>>>> TriggerContext ctx) throws Exception {
>>>>>         return TriggerResult.CONTINUE;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void clear(TimeWindow window, TriggerContext ctx) throws 
>>>>> Exception {
>>>>>         ctx.deleteEventTimeTimer(window.maxTimestamp());
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public boolean canMerge() {
>>>>>         return true;
>>>>>     }
>>>>>
>>>>> and *removed latenness*
>>>>>
>>>>> customerStream
>>>>>
>>>>>         
>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>>>>         .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>>>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
>>>>>         //.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>         .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>>>>         .process(new CustomAggregateFunction());
>>>>>
>>>>>
>>>>> On Thu, May 6, 2021 at 12:32 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>
>>>>>> Your source is not setting the timestamp with collectWithTimestamp.
>>>>>> I'm assuming that nothing really moves from the event time's perspective.
>>>>>>
>>>>>> On Thu, May 6, 2021 at 8:58 AM Swagat Mishra <swaga...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes customer generator is setting the event timestamp correctly like
>>>>>>> I see below. I debugged and found that the events are getting late, so
>>>>>>> never executed. i.e,. in the window operator the method  this
>>>>>>> .isWindowLate(actualWindow) is getting executed to false for the
>>>>>>> rest of the events except the first, hence the events are getting 
>>>>>>> skipped,
>>>>>>> not able to figure out where exactly the issue is.
>>>>>>>
>>>>>>> i have removed evictot=r because I don't think I need it yet.
>>>>>>>
>>>>>>> stream looks like
>>>>>>>
>>>>>>> customerStream
>>>>>>>         .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
>>>>>>>         .keyBy((KeySelector<Customer, String>) Customer::getIdentifier)
>>>>>>>         
>>>>>>> .window(EventTimeSessionWindows.withGap(Time.seconds(5))).allowedLateness(Time.seconds(5))
>>>>>>>         .trigger(new EventTimeTrigger())
>>>>>>>         .process(new CustomAggregateFunction());
>>>>>>>
>>>>>>>
>>>>>>> *Customer generator looks like:*
>>>>>>>
>>>>>>> while (isRunning) {
>>>>>>>     Customer c = new Customer(CUSTOMER_KEY[counter % 5],* 
>>>>>>> LocalTime.now()*, 1000); // that's the event time
>>>>>>>     System.out.println("Writing customer: " + c);
>>>>>>>     sourceContext.collect(c);
>>>>>>>     //sourceContext.emitWatermark(new Watermark(c.getEventTime()));
>>>>>>>     Thread.sleep(1000);
>>>>>>>     counter++;
>>>>>>>     if(counter % 11 == 0) {
>>>>>>>         System.out.println("Sleeping for 10 seconds");
>>>>>>>         Thread.sleep(10000);
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> Custom Watermark generator has this:
>>>>>>>
>>>>>>> .....
>>>>>>> @Override
>>>>>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>>>>>> watermarkOutput) {
>>>>>>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>>>>>> customer.getEventTime()  );
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>>>>     watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp));
>>>>>>>
>>>>>>> }
>>>>>>> .....
>>>>>>>
>>>>>>> trigger looks like:
>>>>>>>
>>>>>>> ------
>>>>>>>
>>>>>>>
>>>>>>>  @Override
>>>>>>>     public TriggerResult onElement(Customer customer, long l, 
>>>>>>> TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
>>>>>>>         if (timeWindow.maxTimestamp() <= 
>>>>>>> triggerContext.getCurrentWatermark()) {
>>>>>>>             // if the watermark is already past the window fire 
>>>>>>> immediately
>>>>>>>             return TriggerResult.FIRE;
>>>>>>>         } else {
>>>>>>>             LOGGER.info("Max timestamp for customer: " + 
>>>>>>> customer.getIdentifier() + " is: " + timeWindow.maxTimestamp());
>>>>>>>             
>>>>>>> triggerContext.registerEventTimeTimer(customer.getLocalTime().plusSeconds(8).toNanoOfDay());
>>>>>>>             return TriggerResult.FIRE;
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public TriggerResult onEventTime(long time, TimeWindow timeWindow, 
>>>>>>> TriggerContext triggerContext) {
>>>>>>> //        if (timeWindow.maxTimestamp() > 
>>>>>>> triggerContext.getCurrentWatermark()) {
>>>>>>> //            
>>>>>>> triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp());
>>>>>>> //            return TriggerResult.CONTINUE;
>>>>>>> //        }
>>>>>>>
>>>>>>>         return time == timeWindow.maxTimestamp() ?
>>>>>>>                 TriggerResult.FIRE :
>>>>>>>                 TriggerResult.CONTINUE;
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>> ....
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 6, 2021 at 12:02 PM Arvid Heise <ar...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Is your CustomerGenerator setting the event timestamp correctly?
>>>>>>>> Are your evictors evicting too early?
>>>>>>>>
>>>>>>>> You can try to add some debug output into the watermark assigner
>>>>>>>> and see if it's indeed progressing as expected.
>>>>>>>>
>>>>>>>> On Thu, May 6, 2021 at 12:48 AM Swagat Mishra <swaga...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> This seems to be working fine in processing time but doesn't work
>>>>>>>>> in event time. Is there an issue with the way the water mark is 
>>>>>>>>> defined or
>>>>>>>>> do we need to set up timers?
>>>>>>>>>
>>>>>>>>> Please advise.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> WORKING:
>>>>>>>>>
>>>>>>>>> customerStream
>>>>>>>>>         .keyBy((KeySelector<Customer, String>) 
>>>>>>>>> Customer::getIdentifier)
>>>>>>>>>         .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>>>>         .trigger(new EventTimeTrigger()) //EventTimeTrigger.create())
>>>>>>>>>         .process(new CustomAggregateFunction());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> NOT WORKING:
>>>>>>>>>
>>>>>>>>> customerStream
>>>>>>>>>         
>>>>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>>>>>>> WaterMarkAssigner()))
>>>>>>>>>         .keyBy(Customer::getIdentifier)
>>>>>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>>>>         .trigger(EventTimeTrigger.create())
>>>>>>>>>         .evictor(new CustomerEvictor())
>>>>>>>>>         .process(new CustomAggregateFunction())
>>>>>>>>>         .print();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, May 6, 2021 at 1:53 AM Sam <swagat....@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Adding the code for CustomWatermarkGenerator
>>>>>>>>>>
>>>>>>>>>> .....
>>>>>>>>>> @Override
>>>>>>>>>> public void onEvent(Customer customer, long l, WatermarkOutput 
>>>>>>>>>> watermarkOutput) {
>>>>>>>>>>     currentMaxTimestamp = Math.max(currentMaxTimestamp, 
>>>>>>>>>> customer.getEventTime()  );
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>>>>>>>>>>     watermarkOutput.emitWatermark(new 
>>>>>>>>>> Watermark(currentMaxTimestamp));
>>>>>>>>>>
>>>>>>>>>> }
>>>>>>>>>> .....
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 6, 2021 at 1:33 AM Swagat Mishra <swaga...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Bit of background, I have a stream of customers who have
>>>>>>>>>>> purchased some product, reading these transactions on a KAFKA 
>>>>>>>>>>> topic. I want
>>>>>>>>>>> to aggregate the number of products the customer has purchased in a
>>>>>>>>>>> particular duration  ( say 10 seconds ) and write to a sink.
>>>>>>>>>>>
>>>>>>>>>>> I am using session windows to achieve the above.
>>>>>>>>>>>
>>>>>>>>>>> For test purposes, i have mocked  up a customer stream and
>>>>>>>>>>> executed session windows like below.
>>>>>>>>>>>
>>>>>>>>>>> StreamExecutionEnvironment environment = 
>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>>>> DataStream<Customer> customerStream = environment.addSource( new 
>>>>>>>>>>> CustomerGenerator() );
>>>>>>>>>>>
>>>>>>>>>>> customerStream
>>>>>>>>>>>         
>>>>>>>>>>> //.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(7)))
>>>>>>>>>>>         
>>>>>>>>>>> .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new 
>>>>>>>>>>> WaterMarkAssigner()))
>>>>>>>>>>>         .keyBy(Customer::getIdentifier)
>>>>>>>>>>>         .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>>>>>>>>>>>         .trigger(EventTimeTrigger.create())
>>>>>>>>>>>         .evictor(new CustomerEvictor())
>>>>>>>>>>>         .process(new CustomAggregateFunction())
>>>>>>>>>>>         .print();
>>>>>>>>>>>
>>>>>>>>>>> My watermark assigner looks like:
>>>>>>>>>>>
>>>>>>>>>>> public class WaterMarkAssigner implements 
>>>>>>>>>>> WatermarkStrategy<Customer> {
>>>>>>>>>>>     static final Logger logger = 
>>>>>>>>>>> LoggerFactory.getLogger(WaterMarkAssigner.class);
>>>>>>>>>>>
>>>>>>>>>>>     @Override
>>>>>>>>>>>     public WatermarkGenerator<Customer> 
>>>>>>>>>>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context 
>>>>>>>>>>> context) {
>>>>>>>>>>>         return new CustomWatermarkGenerator();
>>>>>>>>>>>     }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> I notice that the evictor, and aggregation functions are getting 
>>>>>>>>>>> called only once for the first customer in the stream.
>>>>>>>>>>>
>>>>>>>>>>> The data stream is generating customers at 1 seconds interval and 
>>>>>>>>>>> there are 5 customer keys for which it's generating transactions.
>>>>>>>>>>>
>>>>>>>>>>> Am I doing something wrong with the above?
>>>>>>>>>>>
>>>>>>>>>>> I want to be able to capture the event on each transaction getting 
>>>>>>>>>>> added and removed from the window so that I can perform the 
>>>>>>>>>>> aggregation.
>>>>>>>>>>>
>>>>>>>>>>> please advise.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to