[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16510632#comment-16510632 ]
swy commented on FLINK-9506: ---------------------------- Hi [~sihuazhou] to your questions, 1. Checkpoint is disable at the moment. But in flink_config.yaml incremental checkpoint is enable for rocksdb. 2. No different even comment out onTimer content. 3. Please refer to below for sample code in 'ProcessAggregation' public void processElement(Record r, Context ctx, Collector<Record> out) throws Exception { recordStore.add(r); Record auditRec = new Record(); auditRec.setAuditOnly(true); auditRec.setInput_id(r.getInput_id()); auditRec.setInput_type(r.getInput_type()); auditRec.setOutput_id(r.getOutput_id()); auditRec.setOutput_type(r.getOutput_type()); auditRec.setAddkey(r.getAddkey()); auditRec.setSource_id(r.getSource_id()); auditRec.setINPUT_LINK(r.getINPUT_LINK()); auditRec.setFilename(r.getFilename()); auditRec.setOUTPUT_LINK(r.getOUTPUT_LINK()); auditRec.setEL_COUNTER_IN(1); auditRec.setEL_COUNTER_STORED(1); out.collect(auditRec); ctx.timerService().registerProcessingTimeTimer(((ctx.timerService().currentProcessingTime() + aggrWindowsIntervalMs) / 1000) * 1000); if(countMarker != null) countMarker.count(); } public void onTimer(long timestamp, OnTimerContext ctx, Collector<Record> out) throws Exception { Iterable<Record> records = recordStore.get(); int primary_units = 0; int secondary_units = 0; int tertiary_units = 0; int numReduce = -1; Record lastRecord = null; for (Record rec : records) { primary_units += rec.getI_PRIMARY_UNITS(); secondary_units += rec.getI_SECONDARY_UNITS(); tertiary_units += rec.getI_TERTIARY_UNITS(); lastRecord = rec; numReduce++; } if(lastRecord != null) { lastRecord.setI_PRIMARY_UNITS(primary_units); lastRecord.setI_SECONDARY_UNITS(secondary_units); lastRecord.setI_TERTIARY_UNITS(tertiary_units); lastRecord.setPARTIALS_COMBINED_(numReduce); lastRecord.setEL_COUNTER_RETRIEVED(1); lastRecord.setEL_COUNTER_REDUCED(numReduce); out.collect(lastRecord); } recordStore.clear(); } There is a new observation that when timer is running, which is used to flush record, the input from source will stop. Please refer to new attachment 'input_stop_when_timer_run.png'. Is this something expected? > Flink ReducingState.add causing more than 100% performance drop > --------------------------------------------------------------- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement > Affects Versions: 1.4.2 > Reporter: swy > Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream<String> stream = env.addSource(new GeneratorSource(loop); > DataStream<JSONObject> convert = stream.map(new JsonTranslator()) > .keyBy() > .process(new ProcessAggregation()) > .map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState<Record> recStore; > public void processElement(Recordr, Context ctx, Collector<Record> out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)