Hello, 

Thank you for your answer and apologies for the late response.

For timers we are using : 

state.backend.rocksdb.timer-service.factory: rocksdb 

Are we still affected by [1] ?

For the interruptibility, we have coalesced our timers and the application
became more responsive to stop signals.

Also, after long investigations, we've found that we were abusing/misusing
AggregatingState & RocksDB :-/

The pseudo code of our window function looked like the following :
/    ........
    private AggregatingState<*IN*,OUT> state;
    .......
    @Override
    public void apply(Tuple4<String,String,String,String> key, TimeWindow
window, Iterable<IN> input, Collector<OUT> out) throws Exception {
        Iterator<IN> it= input.iterator();
        while (it.hasNext()){
            *state.add(it.next());*
        }
        out.collect(state.get());
    }
    ......../

Doing so, leads to call getInternal/updateInternal in state.add on RocksDB
for each inputItem and causes a huge pressure on RocksDB.

We transformed the code in order to iterate over items in the
AggregagtingFunction instead and call the state.add only once:

/    ........
    private AggregatingState<*Iterable<IN>*,OUT> state;
    .......
    @Override
    public void apply(Tuple4<String,String,String,String> key, TimeWindow
window, Iterable<IN> input, Collector<OUT> out) throws Exception {
       * state.add(input);*
        out.collect(state.get());
    }
    ........
/
Is this the right way to do so ?

Since this modifications, the application is more stable and the recover
time falls to few minutes.

Thank you for your help.

Best regards,
Amine



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to