Hi,

Could you describe what you observed in details? Which states you compare with 
the session window state "merging-window-set", the "newKeysInState" or 
"existingKeysInState"?

BTW, since we use list state as main state for window operator and we use 
RocksDB's merge operation for window state add operations, this would cause the 
estimating of number keys inaccurate [1]:
  // Estimation will be inaccurate when:
  // (1) there exist merge keys
  // (2) keys are directly overwritten
  // (3) deletion on non-existing keys
  // (4) low number of samples

[1] 
https://github.com/ververica/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/version_set.cc#L919-L924



Best
Yun Tang
________________________________
From: Vishal Santoshi <vishal.santo...@gmail.com>
Sent: Monday, March 15, 2021 5:48
To: user <user@flink.apache.org>
Subject: Re: Question about 
session_aggregate.merging-window-set.rocksdb_estimate-num-keys

All I can think is, that any update on a state key, which I do in my 
ProcessFunction, creates an update ( essentially an append on rocksdb ) which 
does render the previous value for the key, a  tombstone , but that need not 
reflect on the count  ( as double or triple counts ) atomically, thus the 
called as an "estimate" , but was not anticipating this much difference ...

On Sun, Mar 14, 2021 at 5:32 PM Vishal Santoshi 
<vishal.santo...@gmail.com<mailto:vishal.santo...@gmail.com>> wrote:
The reason I ask is that I have a "Process Window Function" on that Session  
Window  and I keep key scoped Global State.  I maintain a TTL on that state ( 
that is outside the Window state )  that is roughly the current WM + lateness.

I would imagine that keys for that custom state are roughly equal to the number 
of keys in the "merging-window-set" . It seems twice that number but does 
follow the slope. I am trying to figure out why this deviation.

public void process(KEY key,
ProcessWindowFunction<KeyedSession<KEY, VALUE>, KeyedSessionWithSessionID<KEY, 
VALUE>, KEY, TimeWindow>.Context context,
Iterable<KeyedSession<KEY, VALUE>> elements, 
Collector<KeyedSessionWithSessionID<KEY, VALUE>> out)
throws Exception {
// scoped to the key
if (state.value() == null) {
this.newKeysInState.inc();
state.update(new IntervalList());
}else{
this.existingKeysInState.inc();
}

On Sun, Mar 14, 2021 at 3:32 PM Vishal Santoshi 
<vishal.santo...@gmail.com<mailto:vishal.santo...@gmail.com>> wrote:
Hey folks,

      Was looking at this very specific metric 
"session_aggregate.merging-window-set.rocksdb_estimate-num-keys".  Does this 
metric also represent session windows ( it is a session window ) that have 
lateness on them ? In essence if the session window was closed but has a 
lateness of a few hours would those keys still be counted against this metric.

I think they should as it is an estimate keys for the Column Family for the 
operator and if the window has not been GCed then the key for those Windows 
should be in RocksDB but wanted to be sure.

Regards.


Reply via email to