[ https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16163002#comment-16163002 ]
Matteo Ferrario commented on FLINK-7606: ---------------------------------------- Hi Kostas, thanks for your reply. I give you some details about the job we've implemented. The job processes a stream of messages coming from RabbitMQ. Each message is deserialize to the custom 'Payload' POJO object, using the 'PayloadSchema' class: {code:java} public class Payload implements Serializable { private static final long serialVersionUID = -7700917163136255068L; private String deviceCode; private Long stateTimestamp; private String phase; public Payload() { } public String getDeviceCode() { return deviceCode; } public void setDeviceCode(String deviceCode) { this.deviceCode = deviceCode; } public Long getStateTimestamp() { return stateTimestamp; } public void setStateTimestamp(Long stateTimestamp) { this.stateTimestamp = stateTimestamp; } public String getPhase() { return phase; } public void setPhase(String phase) { this.phase = phase; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Payload payload = (Payload) o; return Objects.equals(deviceCode, payload.deviceCode) && Objects.equals(stateTimestamp, payload.stateTimestamp) && Objects.equals(phase, payload.phase); } @Override public int hashCode() { return Objects.hash(deviceCode, stateTimestamp, phase); } } {code} {code:java} public class PayloadSchema implements DeserializationSchema<Payload>, SerializationSchema<Payload> { private ObjectMapper mapper = new ObjectMapper(); private static final long serialVersionUID = 1L; @Override public byte[] serialize(Payload element) { byte[] out = new byte[0]; try { out = mapper.writeValueAsBytes(element); } catch (JsonProcessingException e) { e.printStackTrace(); } return out; } @Override public Payload deserialize(byte[] message) { Payload highLevelDeviceStatus = null; try { highLevelDeviceStatus = mapper.readValue(message, Payload.class); } catch (IOException e) { e.printStackTrace(); } return highLevelDeviceStatus; } @Override public boolean isEndOfStream(Payload nextElement) { return false; } @Override public TypeInformation<Payload> getProducedType() { return TypeExtractor.getForClass(Payload.class); } } {code} 'Payload' elements are ordered according to the field 'stateTimestamp': {code:java} DataStream<Payload> dataStreamSource = env.addSource(new RMQSource<>(connectionConfig, "input", new PayloadSchema())) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Payload>(Time.seconds(10)) { private static final long serialVersionUID = -1L; @Override public long extractTimestamp(Payload element) { if (element.getStateTimestamp()==null) { throw new RuntimeException("HighLevelDeviceStatus Timestamp is null during time ordering for device [" + element.getDeviceCode() + "]"); } Date timestamp = getTimeFromJsonTimestamp(element.getStateTimestamp()); logger.debug("DeviceCode [" + element.getDeviceCode() + "] Time [" + df.format(timestamp) + "] Watermark [" + timestamp.getTime() + "]"); return timestamp.getTime(); } }) .uid("ciao") .name("DEVICE_HL_STATUS"); {code} The job tries to identify a specific pattern in the stream, generate an event and send it to RabbitMQ: {code:java} Pattern<Payload, ?> pattern = Pattern .<Payload>begin("start") .subtype(Payload.class) .where(new SimpleCondition<Payload>() { @Override public boolean filter(Payload value) throws Exception { return !value.getPhase().equals("Start"); } }) .next("end") .subtype(Payload.class) .where(new SimpleCondition<Payload>() { @Override public boolean filter(Payload value) throws Exception { return value.getPhase().equals("Start"); } }) .within(Time.minutes(5)); PatternFlatSelectFunction<Payload, Map> patternFlatSelectFunction = (statusMap, collector) -> collector.collect(new Synth().synthesize(statusMap)); PatternStream<Payload> patternStreamStartOfCycle = CEP.pattern( dataStreamSource.keyBy((KeySelector<Payload, Object>) value -> value.getDeviceCode()), pattern); DataStream<Map> outputStream = patternStreamStartOfCycle.flatSelect(patternFlatSelectFunction); outputStream.addSink(new RMQSink(connectionConfig, "events", new MapSchema())).name("prova"); {code} > Memory leak on NestedMapsStateTable > ----------------------------------- > > Key: FLINK-7606 > URL: https://issues.apache.org/jira/browse/FLINK-7606 > Project: Flink > Issue Type: Bug > Components: CEP > Affects Versions: 1.3.1 > Reporter: Matteo Ferrario > > The NestedMapsStateTable grows up continuously without free the heap memory. > We created a simple job that processes a stream of messages and uses CEP to > generate an outcome message when a specific pattern is identified. > The messages coming from the stream are grouped by a key defined in a > specific field of the message. > We've also added the "within" clause (set as 5 minutes), indicating that two > incoming messages match the pattern only if they come in a certain time > window. > What we've seen is that for every key present in the message, an NFA object > is instantiated in the NestedMapsStateTable and it is never deallocated. > Also the "within" clause didn't help: we've seen that if we send messages > that don't match the pattern, the memory grows up (I suppose that the state > of NFA is updated) but it is not cleaned also after the 5 minutes of time > window defined in "within" clause. > If you need, I can provide more details about the job we've implemented and > also the screenshots about the memory leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)