[ 
https://issues.apache.org/jira/browse/FLINK-7606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16163002#comment-16163002
 ] 

Matteo Ferrario commented on FLINK-7606:
----------------------------------------

Hi Kostas,
thanks for your reply.
I give you some details about the job we've implemented.
The job processes a stream of messages coming from RabbitMQ.
Each message is deserialize to the custom 'Payload' POJO object, using the 
'PayloadSchema' class:

{code:java}
public class Payload implements Serializable {

    private static final long serialVersionUID = -7700917163136255068L;

    private String deviceCode;
    private Long stateTimestamp;
    private String phase;

    public Payload() {

    }

    public String getDeviceCode() {
        return deviceCode;
    }

    public void setDeviceCode(String deviceCode) {
        this.deviceCode = deviceCode;
    }

    public Long getStateTimestamp() {
        return stateTimestamp;
    }

    public void setStateTimestamp(Long stateTimestamp) {
        this.stateTimestamp = stateTimestamp;
    }

    public String getPhase() {
        return phase;
    }

    public void setPhase(String phase) {
        this.phase = phase;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        Payload payload = (Payload) o;
        return Objects.equals(deviceCode, payload.deviceCode) && 
Objects.equals(stateTimestamp, payload.stateTimestamp)
                && Objects.equals(phase, payload.phase);
    }

    @Override
    public int hashCode() {
        return Objects.hash(deviceCode, stateTimestamp, phase);
    }
}
{code}

{code:java}
public class PayloadSchema implements DeserializationSchema<Payload>, 
SerializationSchema<Payload> {
    private ObjectMapper mapper = new ObjectMapper();

    private static final long serialVersionUID = 1L;

    @Override
    public byte[] serialize(Payload element) {
        byte[] out = new byte[0];
        try {
            out = mapper.writeValueAsBytes(element);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return out;
    }

    @Override
    public Payload deserialize(byte[] message) {
        Payload highLevelDeviceStatus = null;
        try {
            highLevelDeviceStatus = mapper.readValue(message, Payload.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return highLevelDeviceStatus;
    }

    @Override
    public boolean isEndOfStream(Payload nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Payload> getProducedType() {
        return TypeExtractor.getForClass(Payload.class);
    }
}
{code}

'Payload' elements are ordered according to the field 'stateTimestamp':

{code:java}
DataStream<Payload> dataStreamSource =
        env.addSource(new RMQSource<>(connectionConfig,
                "input",
                new PayloadSchema()))
                .assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor<Payload>(Time.seconds(10)) {
                    private static final long serialVersionUID = -1L;
                    @Override
                    public long extractTimestamp(Payload element) {
                        if (element.getStateTimestamp()==null) {
                            throw new RuntimeException("HighLevelDeviceStatus 
Timestamp is null during time ordering for device [" +  element.getDeviceCode() 
+ "]");
                        }
                        Date timestamp = 
getTimeFromJsonTimestamp(element.getStateTimestamp());

                        logger.debug("DeviceCode [" + element.getDeviceCode() + 
"] Time [" + df.format(timestamp) + "] Watermark [" + timestamp.getTime() + 
"]");
                        return timestamp.getTime();
                    }
                })
                .uid("ciao")
                .name("DEVICE_HL_STATUS");
{code}

The job tries to identify a specific pattern in the stream, generate an event 
and send it to RabbitMQ:

{code:java}
Pattern<Payload, ?> pattern = Pattern
        .<Payload>begin("start")
        .subtype(Payload.class)
        .where(new SimpleCondition<Payload>() {
            @Override
            public boolean filter(Payload value) throws Exception {
                return !value.getPhase().equals("Start");
            }
        })
        .next("end")
        .subtype(Payload.class)
        .where(new SimpleCondition<Payload>() {
            @Override
            public boolean filter(Payload value) throws Exception {
                return value.getPhase().equals("Start");
            }
        })
        .within(Time.minutes(5));

PatternFlatSelectFunction<Payload, Map> patternFlatSelectFunction = (statusMap, 
collector) -> collector.collect(new Synth().synthesize(statusMap));

PatternStream<Payload> patternStreamStartOfCycle = CEP.pattern(
        dataStreamSource.keyBy((KeySelector<Payload, Object>) value -> 
value.getDeviceCode()),
        pattern);

DataStream<Map> outputStream = 
patternStreamStartOfCycle.flatSelect(patternFlatSelectFunction);

outputStream.addSink(new RMQSink(connectionConfig, "events", new 
MapSchema())).name("prova");
{code}

> Memory leak on NestedMapsStateTable
> -----------------------------------
>
>                 Key: FLINK-7606
>                 URL: https://issues.apache.org/jira/browse/FLINK-7606
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.1
>            Reporter: Matteo Ferrario
>
> The NestedMapsStateTable grows up continuously without free the heap memory.
> We created a simple job that processes a stream of messages and uses CEP to 
> generate an outcome message when a specific pattern is identified.
> The messages coming from the stream are grouped by a key defined in a 
> specific field of the message.
> We've also added the "within" clause (set as 5 minutes), indicating that two 
> incoming messages match the pattern only if they come in a certain time 
> window.
> What we've seen is that for every key present in the message, an NFA object 
> is instantiated in the NestedMapsStateTable and it is never deallocated.
> Also the "within" clause didn't help: we've seen that if we send messages 
> that don't match the pattern, the memory grows up (I suppose that the state 
> of NFA is updated) but it is not cleaned also after the 5 minutes of time 
> window defined in "within" clause.
> If you need, I can provide more details about the job we've implemented and 
> also the screenshots about the memory leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to