[ https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16509808#comment-16509808 ]
swy edited comment on FLINK-9506 at 6/12/18 4:10 PM: ----------------------------------------------------- Hi [~sihuazhou] I hope to close the ticket too but the problem still persists even though reduction state is no more in used, with ListState as replacement as suggested by you. However, further investigation show the problem is caused by "KeyBy" instead. Please refer to KeyBy.png, 1. the first run without KeyBy DataStream<Record> AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") 2. the second run with KeyBy and with ProcessAggregation logic(the logic using ListState to store all record and will be sum up when timer triggered) DataStream<Record> AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") .keyBy(new KeySelector<Record, Integer>() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hashCode() * 31; } }) .process(new ProcessAggregation()); 3. the third run is with KeyBy and empty ProcessAggregation logic. The result show ProcessAggregation is not the root caused of fluctuation, no difference between empty logic or logic with ListState. Seems the fluctuation is caused by "KeyBy". Any idea why? Thank you. was (Author: yow): Hi [~sihuazhou] I hope to close the ticket too but the problem still persists even though reduction state is no more in used, with ListState as replacement as suggested by you. However, further investigation show the problem caused by "KeyBy" instead. Please refer to KeyBy.png, 1. the first run without KeyBy DataStream<Record> AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") 2. the second run with KeyBy and with ProcessAggregation logic(the logic using ListState to store all record and will be sum up when timer triggered) DataStream<Record> AggregatedRecordWithAuditStream = sourceStringStream .map(new JsonToRecordTranslator().name("JsonRecTranslator") .keyBy(new KeySelector<Record, Integer>() { @Override public Integer getKey(Record r) throws Exception { return r.getUNIQUE_KEY().hashCode() * 31; } }) .process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(), markerFactory.getMarker())) .name("AggregationDuration: " + aggrDuration +"ms"); 3. the third run is with KeyBy and empty ProcessAggregation logic. The result show ProcessAggregation not the root caused of fluctuation, no difference between empty logic or logic with ListState in ProcessAggregation. Seems the fluctuation is causing by "KeyBy". Any idea why? Thank you. > Flink ReducingState.add causing more than 100% performance drop > --------------------------------------------------------------- > > Key: FLINK-9506 > URL: https://issues.apache.org/jira/browse/FLINK-9506 > Project: Flink > Issue Type: Improvement > Affects Versions: 1.4.2 > Reporter: swy > Priority: Major > Attachments: KeyNoHash_VS_KeyHash.png, flink.png, keyby.png > > > Hi, we found out application performance drop more than 100% when > ReducingState.add is used in the source code. In the test checkpoint is > disable. And filesystem(hdfs) as statebackend. > It could be easyly reproduce with a simple app, without checkpoint, just > simply keep storing record, also with simple reduction function(in fact with > empty function would see the same result). Any idea would be appreciated. > What an unbelievable obvious issue. > Basically the app just keep storing record into the state, and we measure how > many record per second in "JsonTranslator", which is shown in the graph. The > difference between is just 1 line, comment/un-comment "recStore.add(r)". > {code} > DataStream<String> stream = env.addSource(new GeneratorSource(loop); > DataStream<JSONObject> convert = stream.map(new JsonTranslator()) > .keyBy() > .process(new ProcessAggregation()) > .map(new PassthruFunction()); > public class ProcessAggregation extends ProcessFunction { > private ReducingState<Record> recStore; > public void processElement(Recordr, Context ctx, Collector<Record> out) { > recStore.add(r); //this line make the difference > } > {code} > Record is POJO class contain 50 String private member. -- This message was sent by Atlassian JIRA (v7.6.3#76005)