[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503284#comment-16503284
 ] 

swy commented on FLINK-9506:
----------------------------

[~srichter] Thanks for tips, after implement Rocksdb the performance seems much 
more scale-able now, and a little bit less fluctuation. I have few questions 
related to rocksdb. 
1. Just to confirm, RocksDB is needed to setup in every TM machine? Any other 
option?
2. What is the recommendation for RocksDB's statebackend? We are using tmpfs 
with checkpoint now with savepoint persists to hdfs.
3. By source code, rocksdb options like parallelism and certain predefined 
option could be configured, any corresponding parameter in flink_config.yaml?
4. Below is the configuration we are using, could you please comment if 
something not right?
                env.getConfig().enableObjectReuse();
                env.getConfig().setUseSnapshotCompression(true);
                RocksDBStateBackend rocksdb = new RocksDBStateBackend(new 
FsStateBackend("file:///tmp/rocksdb_simple_example/checkpoints"), true);
                env.setStateBackend(rocksdb);
                //rocksdb.setOptions(new RocksdbOptions());
                
rocksdb.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);


Or in flink_config.yaml:
state.backend: rocksdb
state.backend.fs.checkpointdir: file:///tmp/rocksdb_simple_example/checkpoints
state.backend.incremental: true
state.backend.async: true
state.checkpoints.num-retained: 5
state.savepoints.dir:  file:///tmp/rocksdb_simple_example/savepoints


Thank you in advance! 

> Flink ReducingState.add causing more than 100% performance drop
> ---------------------------------------------------------------
>
>                 Key: FLINK-9506
>                 URL: https://issues.apache.org/jira/browse/FLINK-9506
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.4.2
>            Reporter: swy
>            Priority: Major
>         Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream<String> stream = env.addSource(new GeneratorSource(loop);
> DataStream<JSONObject> convert = stream.map(new JsonTranslator())
>                                        .keyBy()
>                                        .process(new ProcessAggregation())
>                                        .map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
>     private ReducingState<Record> recStore;
>     public void processElement(Recordr, Context ctx, Collector<Record> out) {
>         recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to