Thanks for noticing Kenn, I did try with a custom implementation of
InternalWindow storing windowEventCount in addition to start and end but it
didn't work as expected. Missed to remove reference from code sample I
pasted. Below is the custom IntervalWindow and a custom coder for that
class.
public class SessionIntervalWindow extends BoundedWindow implements
Comparable<SessionIntervalWindow> {
/** Start of the interval, inclusive. */
private final Instant start;
/** End of the interval, exclusive. */
private final Instant end;
private int windowEventCount = 1;
/** Creates a new SessionIntervalWindow that represents the
half-open time interval [start, end). */
public SessionIntervalWindow(Instant start, Instant end) {
this.start = start;
this.end = end;
}
public SessionIntervalWindow(Instant start, Instant end, int
windowEventCount) {
this.start = start;
this.end = end;
this.windowEventCount = windowEventCount;
}
public SessionIntervalWindow(Instant start, ReadableDuration size) {
this.start = start;
this.end = start.plus(size);
// this.windowEventCount = windowEventCount;
}
/** Returns the start of this window, inclusive. */
public Instant start() {
return start;
}
/** Returns the end of this window, exclusive. */
public Instant end() {
return end;
}
public int getWindowEventCount() {
return this.windowEventCount;
}
public void incrementWindowEventCountBy(int increment) {
this.windowEventCount += increment;
}
/** Returns the largest timestamp that can be included in this window. */
@Override
public Instant maxTimestamp() {
// end not inclusive
return end.minus(1);
}
/** Returns whether this window contains the given window. */
public boolean contains(SessionIntervalWindow other) {
return !this.start.isAfter(other.start) && !this.end.isBefore(other.end);
}
/** Returns whether this window is disjoint from the given window. */
public boolean isDisjoint(SessionIntervalWindow other) {
return !this.end.isAfter(other.start) || !other.end.isAfter(this.start);
}
/** Returns whether this window intersects the given window. */
public boolean intersects(SessionIntervalWindow other) {
return !isDisjoint(other);
}
/** Returns the minimal window that includes both this window and
the given window. */
public SessionIntervalWindow span(SessionIntervalWindow other) {
return new SessionIntervalWindow(
new Instant(Math.min(start.getMillis(), other.start.getMillis())),
new Instant(Math.max(end.getMillis(), other.end.getMillis())));
}
@Override
public boolean equals(Object o) {
return (o instanceof SessionIntervalWindow)
&& ((SessionIntervalWindow) o).end.isEqual(end)
&& ((SessionIntervalWindow) o).start.isEqual(start);
}
@Override
public int hashCode() {
// The end values are themselves likely to be arithmetic sequence, which
// is a poor distribution to use for a hashtable, so we
// add a highly non-linear transformation.
return (int) (start.getMillis() + modInverse((int)
(end.getMillis() << 1) + 1));
}
/** Compute the inverse of (odd) x mod 2^32. */
private int modInverse(int x) {
// Cube gives inverse mod 2^4, as x^4 == 1 (mod 2^4) for all odd x.
int inverse = x * x * x;
// Newton iteration doubles correct bits at each step.
inverse *= 2 - x * inverse;
inverse *= 2 - x * inverse;
inverse *= 2 - x * inverse;
return inverse;
}
@Override
public String toString() {
return "[" + start + ".." + end + "), "+ windowEventCount + " )";
}
@Override
public int compareTo(SessionIntervalWindow o) {
if (start.isEqual(o.start)) {
return end.compareTo(o.end);
}
return start.compareTo(o.start);
}
/** Returns a {@link Coder} suitable for {@link SessionIntervalWindow}. */
public static Coder<SessionIntervalWindow> getCoder() {
return SessionIntervalWindow.IntervalWindowCoder.of();
}
/** Encodes an {@link SessionIntervalWindow} as a pair of its upper
bound and duration. */
public static class IntervalWindowCoder extends
StructuredCoder<SessionIntervalWindow> {
private static final SessionIntervalWindow.IntervalWindowCoder
INSTANCE = new SessionIntervalWindow.IntervalWindowCoder();
private static final Coder<Instant> instantCoder = InstantCoder.of();
private static final Coder<ReadableDuration> durationCoder =
DurationCoder.of();
private static final Coder<Integer> integerCoder = VarIntCoder.of();
public static SessionIntervalWindow.IntervalWindowCoder of() {
return INSTANCE;
}
@Override
public void encode(SessionIntervalWindow window, OutputStream outStream)
throws IOException, CoderException {
instantCoder.encode(window.end, outStream);
durationCoder.encode(new Duration(window.start, window.end), outStream);
integerCoder.encode(window.windowEventCount, outStream);
}
@Override
public SessionIntervalWindow decode(InputStream inStream) throws
IOException, CoderException {
Instant end = instantCoder.decode(inStream);
ReadableDuration duration = durationCoder.decode(inStream);
int windowEventCount = integerCoder.decode(inStream);
return new SessionIntervalWindow(end.minus(duration), end,
windowEventCount);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {
instantCoder.verifyDeterministic();
durationCoder.verifyDeterministic();
}
@Override
public boolean consistentWithEquals() {
return instantCoder.consistentWithEquals() &&
durationCoder.consistentWithEquals();
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return Collections.emptyList();
}
}
}
On Wed, Feb 12, 2020 at 8:11 PM Kenneth Knowles <[email protected]> wrote:
> I notice that you use the name "IntervalWindow" but you are calling
> methods that IntervalWindow does not have. Do you have a custom
> implementation of this class? Do you have a custom coder for your version
> of IntervalWindow?
>
> Kenn
>
> On Wed, Feb 12, 2020 at 7:30 PM Jainik Vora <[email protected]> wrote:
>
>> Hi Everyone,
>>
>> I am trying to implement session on events with three criteria
>> 1. Gap Duration - eg. 10 mins
>> 2. Max duration - eg. 1 hour
>> 3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by
>> implementing a custom BoundedWindow keeping track of window size and max
>> duration. But I’m having difficulty implementing 3rd criteria which is - a
>> session should have maximum number of events.I’m trying to implement
>> this by tracking number of events in a window but while testing I noticed
>> that mergeWindows is called every 3 seconds and after mergeWindows is
>> executed, windows in that merge is lost, so is the metadata of number of
>> events seen in that window.Any example of pointers would be helpful on
>> how to implement a session with max element/event count. Below is the
>> code I implemented a custom WindowFn:
>>
>> public class UserSessions extends WindowFn<KV<String, Event>,
>> IntervalWindow> {
>> private final Duration gapDuration;
>> private Duration maxSize;
>> private static final Duration DEFAULT_SIZE_DURATION =
>> Duration.standardHours(12L);
>> public UserSessions(Duration gapDuration, Duration sizeDuration) {
>> this.gapDuration = gapDuration;
>> this.maxSize = sizeDuration;
>> }
>> public static UserSessions withGapDuration(Duration gapDuration) {
>> return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION);
>> }
>> public UserSessions withMaxSize(Duration maxSize) {
>> this.maxSize = maxSize;
>> return this;
>> }
>> @Override
>> public Collection<IntervalWindow> assignWindows(AssignContext
>> assignContext) throws Exception {
>> return Arrays.asList(new IntervalWindow(assignContext.timestamp(),
>> gapDuration));
>> }
>> private Duration windowSize(IntervalWindow window) {
>> return window == null
>> ? new Duration(0)
>> : new Duration(window.start(), window.end());
>> }
>> @Override
>> public void mergeWindows(MergeContext mergeContext) throws Exception {
>> List<IntervalWindow> sortedWindows = new ArrayList<>();
>> for (IntervalWindow window : mergeContext.windows()) {
>> sortedWindows.add(window);
>> }
>> Collections.sort(sortedWindows);
>> List<MergeCandidate> merges = new ArrayList<>();
>> MergeCandidate current = new MergeCandidate();
>> for (IntervalWindow window : sortedWindows) {
>> MergeCandidate next = new MergeCandidate(window);
>> if (current.intersects(window)) {
>> current.add(window);
>> Duration currentWindow = windowSize(current.union);
>> if (currentWindow.isShorterThan(maxSize) ||
>> currentWindow.isEqual(maxSize) || current.size() < 10)
>> continue;
>> // Current window exceeds bounds, so flush and move to next
>> LOG.info("********** EXCEEDS 10 Events CRITERIA."); // this never
>> hits.
>> next = new MergeCandidate();
>> }
>> merges.add(current);
>> current = next;
>> }
>> merges.add(current);
>> for (MergeCandidate merge : merges) {
>> merge.apply(mergeContext);
>> }
>> }
>> @Override
>> public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
>> throw new UnsupportedOperationException("Sessions is not allowed in side
>> inputs");
>> }
>> @Override
>> public boolean isCompatible(WindowFn<?, ?> other) {
>> return false;
>> }
>> @Override
>> public Coder<IntervalWindow> windowCoder() {
>> return IntervalWindow.getCoder();
>> }
>> private static class MergeCandidate {
>> @Nullable
>> private IntervalWindow union;
>> private final List<IntervalWindow> parts;
>> public MergeCandidate() {
>> union = null;
>> parts = new ArrayList<>();
>> }
>> public MergeCandidate(IntervalWindow window) {
>> union = window;
>> parts = new ArrayList<>(Arrays.asList(window));
>> }
>> public boolean intersects(IntervalWindow window) {
>> return union == null || union.intersects(window);
>> }
>> public void add(IntervalWindow window) {
>> union = union == null ? window : union.span(window);
>> union.incrementWindowEventCountBy(window.getWindowEventCount() + 1);
>> parts.add(window);
>> }
>> public void apply(WindowFn<?, IntervalWindow>.MergeContext c) throws
>> Exception {
>> if (this.parts.size() > 1) {
>> c.merge(parts, union);
>> }
>> }
>> public int size() {
>> return this.parts.size();
>> }
>> @Override
>> public String toString() {
>> return "MergeCandidate[union=" + union + ", parts=" + parts + "]";
>> }
>> }
>> }
>>
>> Thanks & Regards,
>>
>> *Jainik Vora*
>>
>>
--
Thanks & Regards,
*Jainikkumar Vora*
Software Engineer 2
Data Fabric | Intuit
408-854-0311 | LinkedIn <https://www.linkedin.com/in/jainikvora>