Hi Kenn,

Let me know if I'm missing something in below email. I was going through
stateful processing blog posts but that wouldn't solve the issue since
Window cannot be broken on operations done after WindowFn. Wondering what I
could leverage to enforce max events in a Window that has a gap duration +
a max duration.

Appreciate the help on this,
Jainik

On Wed, Feb 12, 2020 at 8:22 PM Jainik Vora <[email protected]> wrote:

> Thanks for noticing Kenn, I did try with a custom implementation of
> InternalWindow storing windowEventCount in addition to start and end but it
> didn't work as expected. Missed to remove reference from code sample I
> pasted. Below is the custom IntervalWindow and a custom coder for that
> class.
>
> public class SessionIntervalWindow extends BoundedWindow implements 
> Comparable<SessionIntervalWindow> {
>   /** Start of the interval, inclusive. */
>   private final Instant start;
>
>   /** End of the interval, exclusive. */
>   private final Instant end;
>
>   private int windowEventCount = 1;
>
>   /** Creates a new SessionIntervalWindow that represents the half-open time 
> interval [start, end). */
>   public SessionIntervalWindow(Instant start, Instant end) {
>     this.start = start;
>     this.end = end;
>   }
>
>   public SessionIntervalWindow(Instant start, Instant end, int 
> windowEventCount) {
>     this.start = start;
>     this.end = end;
>     this.windowEventCount = windowEventCount;
>   }
>
>   public SessionIntervalWindow(Instant start, ReadableDuration size) {
>     this.start = start;
>     this.end = start.plus(size);
> //    this.windowEventCount = windowEventCount;
>   }
>
>   /** Returns the start of this window, inclusive. */
>   public Instant start() {
>     return start;
>   }
>
>   /** Returns the end of this window, exclusive. */
>   public Instant end() {
>     return end;
>   }
>
>   public int getWindowEventCount() {
>     return this.windowEventCount;
>   }
>
>   public void incrementWindowEventCountBy(int increment) {
>     this.windowEventCount += increment;
>   }
>
>   /** Returns the largest timestamp that can be included in this window. */
>   @Override
>   public Instant maxTimestamp() {
>     // end not inclusive
>     return end.minus(1);
>   }
>
>   /** Returns whether this window contains the given window. */
>   public boolean contains(SessionIntervalWindow other) {
>     return !this.start.isAfter(other.start) && !this.end.isBefore(other.end);
>   }
>
>   /** Returns whether this window is disjoint from the given window. */
>   public boolean isDisjoint(SessionIntervalWindow other) {
>     return !this.end.isAfter(other.start) || !other.end.isAfter(this.start);
>   }
>
>   /** Returns whether this window intersects the given window. */
>   public boolean intersects(SessionIntervalWindow other) {
>     return !isDisjoint(other);
>   }
>
>   /** Returns the minimal window that includes both this window and the given 
> window. */
>   public SessionIntervalWindow span(SessionIntervalWindow other) {
>     return new SessionIntervalWindow(
>             new Instant(Math.min(start.getMillis(), other.start.getMillis())),
>             new Instant(Math.max(end.getMillis(), other.end.getMillis())));
>   }
>
>   @Override
>   public boolean equals(Object o) {
>     return (o instanceof SessionIntervalWindow)
>             && ((SessionIntervalWindow) o).end.isEqual(end)
>             && ((SessionIntervalWindow) o).start.isEqual(start);
>   }
>
>   @Override
>   public int hashCode() {
>     // The end values are themselves likely to be arithmetic sequence, which
>     // is a poor distribution to use for a hashtable, so we
>     // add a highly non-linear transformation.
>     return (int) (start.getMillis() + modInverse((int) (end.getMillis() << 1) 
> + 1));
>   }
>
>   /** Compute the inverse of (odd) x mod 2^32. */
>   private int modInverse(int x) {
>     // Cube gives inverse mod 2^4, as x^4 == 1 (mod 2^4) for all odd x.
>     int inverse = x * x * x;
>     // Newton iteration doubles correct bits at each step.
>     inverse *= 2 - x * inverse;
>     inverse *= 2 - x * inverse;
>     inverse *= 2 - x * inverse;
>     return inverse;
>   }
>
>   @Override
>   public String toString() {
>     return "[" + start + ".." + end + "), "+ windowEventCount + " )";
>   }
>
>
>
>   @Override
>   public int compareTo(SessionIntervalWindow o) {
>     if (start.isEqual(o.start)) {
>       return end.compareTo(o.end);
>     }
>     return start.compareTo(o.start);
>   }
>
>   /** Returns a {@link Coder} suitable for {@link SessionIntervalWindow}. */
>   public static Coder<SessionIntervalWindow> getCoder() {
>     return SessionIntervalWindow.IntervalWindowCoder.of();
>   }
>
>   /** Encodes an {@link SessionIntervalWindow} as a pair of its upper bound 
> and duration. */
>   public static class IntervalWindowCoder extends 
> StructuredCoder<SessionIntervalWindow> {
>
>     private static final SessionIntervalWindow.IntervalWindowCoder INSTANCE = 
> new SessionIntervalWindow.IntervalWindowCoder();
>
>     private static final Coder<Instant> instantCoder = InstantCoder.of();
>     private static final Coder<ReadableDuration> durationCoder = 
> DurationCoder.of();
>     private static final Coder<Integer> integerCoder = VarIntCoder.of();
>
>     public static SessionIntervalWindow.IntervalWindowCoder of() {
>       return INSTANCE;
>     }
>
>     @Override
>     public void encode(SessionIntervalWindow window, OutputStream outStream)
>             throws IOException, CoderException {
>       instantCoder.encode(window.end, outStream);
>       durationCoder.encode(new Duration(window.start, window.end), outStream);
>       integerCoder.encode(window.windowEventCount, outStream);
>     }
>
>     @Override
>     public SessionIntervalWindow decode(InputStream inStream) throws 
> IOException, CoderException {
>       Instant end = instantCoder.decode(inStream);
>       ReadableDuration duration = durationCoder.decode(inStream);
>       int windowEventCount = integerCoder.decode(inStream);
>       return new SessionIntervalWindow(end.minus(duration), end, 
> windowEventCount);
>     }
>
>     @Override
>     public void verifyDeterministic() throws NonDeterministicException {
>       instantCoder.verifyDeterministic();
>       durationCoder.verifyDeterministic();
>     }
>
>     @Override
>     public boolean consistentWithEquals() {
>       return instantCoder.consistentWithEquals() && 
> durationCoder.consistentWithEquals();
>     }
>
>     @Override
>     public List<? extends Coder<?>> getCoderArguments() {
>       return Collections.emptyList();
>     }
>   }
> }
>
>
> On Wed, Feb 12, 2020 at 8:11 PM Kenneth Knowles <[email protected]> wrote:
>
>> I notice that you use the name "IntervalWindow" but you are calling
>> methods that IntervalWindow does not have. Do you have a custom
>> implementation of this class? Do you have a custom coder for your version
>> of IntervalWindow?
>>
>> Kenn
>>
>> On Wed, Feb 12, 2020 at 7:30 PM Jainik Vora <[email protected]>
>> wrote:
>>
>>> Hi Everyone,
>>>
>>> I am trying to implement session on events with three criteria
>>> 1. Gap Duration - eg. 10 mins
>>> 2. Max duration - eg. 1 hour
>>> 3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by
>>> implementing a custom BoundedWindow keeping track of window size and max
>>> duration. But I’m having difficulty implementing 3rd criteria which is - a
>>> session should have maximum number of events.I’m trying to implement
>>> this by tracking number of events in a window but while testing I noticed
>>> that mergeWindows is called every 3 seconds and after mergeWindows is
>>> executed, windows in that merge is lost, so is the metadata of number of
>>> events seen in that window.Any example of pointers would be helpful on
>>> how to implement a session with max element/event count. Below is the
>>> code I implemented a custom WindowFn:
>>>
>>> public class UserSessions extends WindowFn<KV<String, Event>, 
>>> IntervalWindow> {
>>>   private final Duration gapDuration;
>>>   private Duration maxSize;
>>>   private static final Duration DEFAULT_SIZE_DURATION = 
>>> Duration.standardHours(12L);
>>>   public UserSessions(Duration gapDuration, Duration sizeDuration) {
>>>     this.gapDuration = gapDuration;
>>>     this.maxSize = sizeDuration;
>>>   }
>>>   public static UserSessions withGapDuration(Duration gapDuration) {
>>>     return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION);
>>>   }
>>>   public UserSessions withMaxSize(Duration maxSize) {
>>>     this.maxSize = maxSize;
>>>     return this;
>>>   }
>>>   @Override
>>>   public Collection<IntervalWindow> assignWindows(AssignContext 
>>> assignContext) throws Exception {
>>>     return Arrays.asList(new IntervalWindow(assignContext.timestamp(), 
>>> gapDuration));
>>>   }
>>>   private Duration windowSize(IntervalWindow window) {
>>>     return window == null
>>>             ? new Duration(0)
>>>             : new Duration(window.start(), window.end());
>>>   }
>>>   @Override
>>>   public void mergeWindows(MergeContext mergeContext) throws Exception {
>>>     List<IntervalWindow> sortedWindows = new ArrayList<>();
>>>     for (IntervalWindow window : mergeContext.windows()) {
>>>       sortedWindows.add(window);
>>>     }
>>>     Collections.sort(sortedWindows);
>>>     List<MergeCandidate> merges = new ArrayList<>();
>>>     MergeCandidate current = new MergeCandidate();
>>>     for (IntervalWindow window : sortedWindows) {
>>>       MergeCandidate next = new MergeCandidate(window);
>>>       if (current.intersects(window)) {
>>>         current.add(window);
>>>         Duration currentWindow = windowSize(current.union);
>>>         if (currentWindow.isShorterThan(maxSize) || 
>>> currentWindow.isEqual(maxSize) || current.size() < 10)
>>>           continue;
>>>         // Current window exceeds bounds, so flush and move to next
>>>         LOG.info("********** EXCEEDS 10 Events CRITERIA."); // this never 
>>> hits.
>>>         next = new MergeCandidate();
>>>       }
>>>       merges.add(current);
>>>       current = next;
>>>     }
>>>     merges.add(current);
>>>     for (MergeCandidate merge : merges) {
>>>       merge.apply(mergeContext);
>>>     }
>>>   }
>>>   @Override
>>>   public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
>>>     throw new UnsupportedOperationException("Sessions is not allowed in 
>>> side inputs");
>>>   }
>>>   @Override
>>>   public boolean isCompatible(WindowFn<?, ?> other) {
>>>     return false;
>>>   }
>>>   @Override
>>>   public Coder<IntervalWindow> windowCoder() {
>>>     return IntervalWindow.getCoder();
>>>   }
>>>   private static class MergeCandidate {
>>>     @Nullable
>>>     private IntervalWindow union;
>>>     private final List<IntervalWindow> parts;
>>>     public MergeCandidate() {
>>>       union = null;
>>>       parts = new ArrayList<>();
>>>     }
>>>     public MergeCandidate(IntervalWindow window) {
>>>       union = window;
>>>       parts = new ArrayList<>(Arrays.asList(window));
>>>     }
>>>     public boolean intersects(IntervalWindow window) {
>>>       return union == null || union.intersects(window);
>>>     }
>>>     public void add(IntervalWindow window) {
>>>       union = union == null ? window : union.span(window);
>>>       union.incrementWindowEventCountBy(window.getWindowEventCount() + 1);
>>>       parts.add(window);
>>>     }
>>>     public void apply(WindowFn<?, IntervalWindow>.MergeContext c) throws 
>>> Exception {
>>>       if (this.parts.size() > 1) {
>>>         c.merge(parts, union);
>>>       }
>>>     }
>>>     public int size() {
>>>       return this.parts.size();
>>>     }
>>>     @Override
>>>     public String toString() {
>>>       return "MergeCandidate[union=" + union + ", parts=" + parts + "]";
>>>     }
>>>   }
>>> }
>>>
>>> Thanks & Regards,
>>>
>>> *Jainik Vora*
>>>
>>>
>
>

-- 


Thanks & Regards,

*Jainik Vora*

Reply via email to