[ https://issues.apache.org/jira/browse/FLINK-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372959#comment-15372959 ]
ASF GitHub Bot commented on FLINK-4149: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2202#discussion_r70449634 --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java --- @@ -454,4 +458,126 @@ static String generateStateName(final String name, final int index) { return name + "_" + index; } } + + /** + * {@link TypeSerializer} for {@link NFA} that uses Java Serialization. + */ + public static class Serializer<T> extends TypeSerializer<NFA<T>> { + private static final long serialVersionUID = 1L; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer<NFA<T>> duplicate() { + return this; + } + + @Override + public NFA<T> createInstance() { + return null; + } + + @Override + public NFA<T> copy(NFA<T> from) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(from); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + ByteArrayInputStream bais = new ByteArrayInputStream(data); + ObjectInputStream ois = new ObjectInputStream(bais); + + @SuppressWarnings("unchecked") + NFA<T> copy = (NFA<T>) ois.readObject(); + return copy; + } catch (IOException|ClassNotFoundException e) { + throw new RuntimeException("Could not copy NFA.", e); + } + } + + @Override + public NFA<T> copy(NFA<T> from, NFA<T> reuse) { + return copy(from); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(NFA<T> record, DataOutputView target) throws IOException { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + + oos.writeObject(record); + + oos.close(); + baos.close(); + + byte[] data = baos.toByteArray(); + + target.writeInt(data.length); + target.write(data); --- End diff -- Can't we use the `DataOutputViewStream`? ``` ObjectOutputStream oos = new ObjectOutputStream(new DataOutputViewStream(target)); oos.writeObject(record); oos.close(); ``` > Fix Serialization of NFA in AbstractKeyedCEPPatternOperator > ----------------------------------------------------------- > > Key: FLINK-4149 > URL: https://issues.apache.org/jira/browse/FLINK-4149 > Project: Flink > Issue Type: Bug > Components: CEP > Affects Versions: 1.0.0, 1.1.0, 1.0.1, 1.0.2, 1.0.3 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.1.0 > > > A job that uses CEP fails upon restore with a {{NullPointerException}} in > {{NFA.process()}}. The reason seems to be that field {{computationStates}} is > {{null}}. This field is transient and read in a custom {{readObject()}} > method. > In {{AbstractKeyedCEPPatternOperator}} this snipped is used to construct a > {{StateDescriptor}} for an {{NFA}} state: > {code} > new ValueStateDescriptor<NFA<IN>>( > NFA_OPERATOR_STATE_NAME, > new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class, > getExecutionConfig()), > null) > {code} > It seems Kryo does not invoke {{readObject}}/{{writeObject}}. We probably > need a custom {{TypeSerializer}} for {{NFA}} to solve the problem. -- This message was sent by Atlassian JIRA (v6.3.4#6332)