Hi Kenn, Let me know if I'm missing something in below email. I was going through stateful processing blog posts but that wouldn't solve the issue since Window cannot be broken on operations done after WindowFn. Wondering what I could leverage to enforce max events in a Window that has a gap duration + a max duration.
Appreciate the help on this, Jainik On Wed, Feb 12, 2020 at 8:22 PM Jainik Vora <[email protected]> wrote: > Thanks for noticing Kenn, I did try with a custom implementation of > InternalWindow storing windowEventCount in addition to start and end but it > didn't work as expected. Missed to remove reference from code sample I > pasted. Below is the custom IntervalWindow and a custom coder for that > class. > > public class SessionIntervalWindow extends BoundedWindow implements > Comparable<SessionIntervalWindow> { > /** Start of the interval, inclusive. */ > private final Instant start; > > /** End of the interval, exclusive. */ > private final Instant end; > > private int windowEventCount = 1; > > /** Creates a new SessionIntervalWindow that represents the half-open time > interval [start, end). */ > public SessionIntervalWindow(Instant start, Instant end) { > this.start = start; > this.end = end; > } > > public SessionIntervalWindow(Instant start, Instant end, int > windowEventCount) { > this.start = start; > this.end = end; > this.windowEventCount = windowEventCount; > } > > public SessionIntervalWindow(Instant start, ReadableDuration size) { > this.start = start; > this.end = start.plus(size); > // this.windowEventCount = windowEventCount; > } > > /** Returns the start of this window, inclusive. */ > public Instant start() { > return start; > } > > /** Returns the end of this window, exclusive. */ > public Instant end() { > return end; > } > > public int getWindowEventCount() { > return this.windowEventCount; > } > > public void incrementWindowEventCountBy(int increment) { > this.windowEventCount += increment; > } > > /** Returns the largest timestamp that can be included in this window. */ > @Override > public Instant maxTimestamp() { > // end not inclusive > return end.minus(1); > } > > /** Returns whether this window contains the given window. */ > public boolean contains(SessionIntervalWindow other) { > return !this.start.isAfter(other.start) && !this.end.isBefore(other.end); > } > > /** Returns whether this window is disjoint from the given window. */ > public boolean isDisjoint(SessionIntervalWindow other) { > return !this.end.isAfter(other.start) || !other.end.isAfter(this.start); > } > > /** Returns whether this window intersects the given window. */ > public boolean intersects(SessionIntervalWindow other) { > return !isDisjoint(other); > } > > /** Returns the minimal window that includes both this window and the given > window. */ > public SessionIntervalWindow span(SessionIntervalWindow other) { > return new SessionIntervalWindow( > new Instant(Math.min(start.getMillis(), other.start.getMillis())), > new Instant(Math.max(end.getMillis(), other.end.getMillis()))); > } > > @Override > public boolean equals(Object o) { > return (o instanceof SessionIntervalWindow) > && ((SessionIntervalWindow) o).end.isEqual(end) > && ((SessionIntervalWindow) o).start.isEqual(start); > } > > @Override > public int hashCode() { > // The end values are themselves likely to be arithmetic sequence, which > // is a poor distribution to use for a hashtable, so we > // add a highly non-linear transformation. > return (int) (start.getMillis() + modInverse((int) (end.getMillis() << 1) > + 1)); > } > > /** Compute the inverse of (odd) x mod 2^32. */ > private int modInverse(int x) { > // Cube gives inverse mod 2^4, as x^4 == 1 (mod 2^4) for all odd x. > int inverse = x * x * x; > // Newton iteration doubles correct bits at each step. > inverse *= 2 - x * inverse; > inverse *= 2 - x * inverse; > inverse *= 2 - x * inverse; > return inverse; > } > > @Override > public String toString() { > return "[" + start + ".." + end + "), "+ windowEventCount + " )"; > } > > > > @Override > public int compareTo(SessionIntervalWindow o) { > if (start.isEqual(o.start)) { > return end.compareTo(o.end); > } > return start.compareTo(o.start); > } > > /** Returns a {@link Coder} suitable for {@link SessionIntervalWindow}. */ > public static Coder<SessionIntervalWindow> getCoder() { > return SessionIntervalWindow.IntervalWindowCoder.of(); > } > > /** Encodes an {@link SessionIntervalWindow} as a pair of its upper bound > and duration. */ > public static class IntervalWindowCoder extends > StructuredCoder<SessionIntervalWindow> { > > private static final SessionIntervalWindow.IntervalWindowCoder INSTANCE = > new SessionIntervalWindow.IntervalWindowCoder(); > > private static final Coder<Instant> instantCoder = InstantCoder.of(); > private static final Coder<ReadableDuration> durationCoder = > DurationCoder.of(); > private static final Coder<Integer> integerCoder = VarIntCoder.of(); > > public static SessionIntervalWindow.IntervalWindowCoder of() { > return INSTANCE; > } > > @Override > public void encode(SessionIntervalWindow window, OutputStream outStream) > throws IOException, CoderException { > instantCoder.encode(window.end, outStream); > durationCoder.encode(new Duration(window.start, window.end), outStream); > integerCoder.encode(window.windowEventCount, outStream); > } > > @Override > public SessionIntervalWindow decode(InputStream inStream) throws > IOException, CoderException { > Instant end = instantCoder.decode(inStream); > ReadableDuration duration = durationCoder.decode(inStream); > int windowEventCount = integerCoder.decode(inStream); > return new SessionIntervalWindow(end.minus(duration), end, > windowEventCount); > } > > @Override > public void verifyDeterministic() throws NonDeterministicException { > instantCoder.verifyDeterministic(); > durationCoder.verifyDeterministic(); > } > > @Override > public boolean consistentWithEquals() { > return instantCoder.consistentWithEquals() && > durationCoder.consistentWithEquals(); > } > > @Override > public List<? extends Coder<?>> getCoderArguments() { > return Collections.emptyList(); > } > } > } > > > On Wed, Feb 12, 2020 at 8:11 PM Kenneth Knowles <[email protected]> wrote: > >> I notice that you use the name "IntervalWindow" but you are calling >> methods that IntervalWindow does not have. Do you have a custom >> implementation of this class? Do you have a custom coder for your version >> of IntervalWindow? >> >> Kenn >> >> On Wed, Feb 12, 2020 at 7:30 PM Jainik Vora <[email protected]> >> wrote: >> >>> Hi Everyone, >>> >>> I am trying to implement session on events with three criteria >>> 1. Gap Duration - eg. 10 mins >>> 2. Max duration - eg. 1 hour >>> 3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by >>> implementing a custom BoundedWindow keeping track of window size and max >>> duration. But I’m having difficulty implementing 3rd criteria which is - a >>> session should have maximum number of events.I’m trying to implement >>> this by tracking number of events in a window but while testing I noticed >>> that mergeWindows is called every 3 seconds and after mergeWindows is >>> executed, windows in that merge is lost, so is the metadata of number of >>> events seen in that window.Any example of pointers would be helpful on >>> how to implement a session with max element/event count. Below is the >>> code I implemented a custom WindowFn: >>> >>> public class UserSessions extends WindowFn<KV<String, Event>, >>> IntervalWindow> { >>> private final Duration gapDuration; >>> private Duration maxSize; >>> private static final Duration DEFAULT_SIZE_DURATION = >>> Duration.standardHours(12L); >>> public UserSessions(Duration gapDuration, Duration sizeDuration) { >>> this.gapDuration = gapDuration; >>> this.maxSize = sizeDuration; >>> } >>> public static UserSessions withGapDuration(Duration gapDuration) { >>> return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION); >>> } >>> public UserSessions withMaxSize(Duration maxSize) { >>> this.maxSize = maxSize; >>> return this; >>> } >>> @Override >>> public Collection<IntervalWindow> assignWindows(AssignContext >>> assignContext) throws Exception { >>> return Arrays.asList(new IntervalWindow(assignContext.timestamp(), >>> gapDuration)); >>> } >>> private Duration windowSize(IntervalWindow window) { >>> return window == null >>> ? new Duration(0) >>> : new Duration(window.start(), window.end()); >>> } >>> @Override >>> public void mergeWindows(MergeContext mergeContext) throws Exception { >>> List<IntervalWindow> sortedWindows = new ArrayList<>(); >>> for (IntervalWindow window : mergeContext.windows()) { >>> sortedWindows.add(window); >>> } >>> Collections.sort(sortedWindows); >>> List<MergeCandidate> merges = new ArrayList<>(); >>> MergeCandidate current = new MergeCandidate(); >>> for (IntervalWindow window : sortedWindows) { >>> MergeCandidate next = new MergeCandidate(window); >>> if (current.intersects(window)) { >>> current.add(window); >>> Duration currentWindow = windowSize(current.union); >>> if (currentWindow.isShorterThan(maxSize) || >>> currentWindow.isEqual(maxSize) || current.size() < 10) >>> continue; >>> // Current window exceeds bounds, so flush and move to next >>> LOG.info("********** EXCEEDS 10 Events CRITERIA."); // this never >>> hits. >>> next = new MergeCandidate(); >>> } >>> merges.add(current); >>> current = next; >>> } >>> merges.add(current); >>> for (MergeCandidate merge : merges) { >>> merge.apply(mergeContext); >>> } >>> } >>> @Override >>> public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() { >>> throw new UnsupportedOperationException("Sessions is not allowed in >>> side inputs"); >>> } >>> @Override >>> public boolean isCompatible(WindowFn<?, ?> other) { >>> return false; >>> } >>> @Override >>> public Coder<IntervalWindow> windowCoder() { >>> return IntervalWindow.getCoder(); >>> } >>> private static class MergeCandidate { >>> @Nullable >>> private IntervalWindow union; >>> private final List<IntervalWindow> parts; >>> public MergeCandidate() { >>> union = null; >>> parts = new ArrayList<>(); >>> } >>> public MergeCandidate(IntervalWindow window) { >>> union = window; >>> parts = new ArrayList<>(Arrays.asList(window)); >>> } >>> public boolean intersects(IntervalWindow window) { >>> return union == null || union.intersects(window); >>> } >>> public void add(IntervalWindow window) { >>> union = union == null ? window : union.span(window); >>> union.incrementWindowEventCountBy(window.getWindowEventCount() + 1); >>> parts.add(window); >>> } >>> public void apply(WindowFn<?, IntervalWindow>.MergeContext c) throws >>> Exception { >>> if (this.parts.size() > 1) { >>> c.merge(parts, union); >>> } >>> } >>> public int size() { >>> return this.parts.size(); >>> } >>> @Override >>> public String toString() { >>> return "MergeCandidate[union=" + union + ", parts=" + parts + "]"; >>> } >>> } >>> } >>> >>> Thanks & Regards, >>> >>> *Jainik Vora* >>> >>> > > -- Thanks & Regards, *Jainik Vora*
