I notice that you use the name "IntervalWindow" but you are calling methods
that IntervalWindow does not have. Do you have a custom implementation of
this class? Do you have a custom coder for your version of IntervalWindow?

Kenn

On Wed, Feb 12, 2020 at 7:30 PM Jainik Vora <[email protected]> wrote:

> Hi Everyone,
>
> I am trying to implement session on events with three criteria
> 1. Gap Duration - eg. 10 mins
> 2. Max duration - eg. 1 hour
> 3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by
> implementing a custom BoundedWindow keeping track of window size and max
> duration. But I’m having difficulty implementing 3rd criteria which is - a
> session should have maximum number of events.I’m trying to implement this
> by tracking number of events in a window but while testing I noticed that
> mergeWindows is called every 3 seconds and after mergeWindows is executed,
> windows in that merge is lost, so is the metadata of number of events seen
> in that window.Any example of pointers would be helpful on how to
> implement a session with max element/event count. Below is the code I
> implemented a custom WindowFn:
>
> public class UserSessions extends WindowFn<KV<String, Event>, IntervalWindow> 
> {
>   private final Duration gapDuration;
>   private Duration maxSize;
>   private static final Duration DEFAULT_SIZE_DURATION = 
> Duration.standardHours(12L);
>   public UserSessions(Duration gapDuration, Duration sizeDuration) {
>     this.gapDuration = gapDuration;
>     this.maxSize = sizeDuration;
>   }
>   public static UserSessions withGapDuration(Duration gapDuration) {
>     return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION);
>   }
>   public UserSessions withMaxSize(Duration maxSize) {
>     this.maxSize = maxSize;
>     return this;
>   }
>   @Override
>   public Collection<IntervalWindow> assignWindows(AssignContext 
> assignContext) throws Exception {
>     return Arrays.asList(new IntervalWindow(assignContext.timestamp(), 
> gapDuration));
>   }
>   private Duration windowSize(IntervalWindow window) {
>     return window == null
>             ? new Duration(0)
>             : new Duration(window.start(), window.end());
>   }
>   @Override
>   public void mergeWindows(MergeContext mergeContext) throws Exception {
>     List<IntervalWindow> sortedWindows = new ArrayList<>();
>     for (IntervalWindow window : mergeContext.windows()) {
>       sortedWindows.add(window);
>     }
>     Collections.sort(sortedWindows);
>     List<MergeCandidate> merges = new ArrayList<>();
>     MergeCandidate current = new MergeCandidate();
>     for (IntervalWindow window : sortedWindows) {
>       MergeCandidate next = new MergeCandidate(window);
>       if (current.intersects(window)) {
>         current.add(window);
>         Duration currentWindow = windowSize(current.union);
>         if (currentWindow.isShorterThan(maxSize) || 
> currentWindow.isEqual(maxSize) || current.size() < 10)
>           continue;
>         // Current window exceeds bounds, so flush and move to next
>         LOG.info("********** EXCEEDS 10 Events CRITERIA."); // this never 
> hits.
>         next = new MergeCandidate();
>       }
>       merges.add(current);
>       current = next;
>     }
>     merges.add(current);
>     for (MergeCandidate merge : merges) {
>       merge.apply(mergeContext);
>     }
>   }
>   @Override
>   public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
>     throw new UnsupportedOperationException("Sessions is not allowed in side 
> inputs");
>   }
>   @Override
>   public boolean isCompatible(WindowFn<?, ?> other) {
>     return false;
>   }
>   @Override
>   public Coder<IntervalWindow> windowCoder() {
>     return IntervalWindow.getCoder();
>   }
>   private static class MergeCandidate {
>     @Nullable
>     private IntervalWindow union;
>     private final List<IntervalWindow> parts;
>     public MergeCandidate() {
>       union = null;
>       parts = new ArrayList<>();
>     }
>     public MergeCandidate(IntervalWindow window) {
>       union = window;
>       parts = new ArrayList<>(Arrays.asList(window));
>     }
>     public boolean intersects(IntervalWindow window) {
>       return union == null || union.intersects(window);
>     }
>     public void add(IntervalWindow window) {
>       union = union == null ? window : union.span(window);
>       union.incrementWindowEventCountBy(window.getWindowEventCount() + 1);
>       parts.add(window);
>     }
>     public void apply(WindowFn<?, IntervalWindow>.MergeContext c) throws 
> Exception {
>       if (this.parts.size() > 1) {
>         c.merge(parts, union);
>       }
>     }
>     public int size() {
>       return this.parts.size();
>     }
>     @Override
>     public String toString() {
>       return "MergeCandidate[union=" + union + ", parts=" + parts + "]";
>     }
>   }
> }
>
> Thanks & Regards,
>
> *Jainik Vora*
>
>

Reply via email to