[ 
https://issues.apache.org/jira/browse/FLINK-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15372960#comment-15372960
 ] 

ASF GitHub Bot commented on FLINK-4149:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2202#discussion_r70449753
  
    --- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -454,4 +458,126 @@ static String generateStateName(final String name, 
final int index) {
                        return name + "_" + index;
                }
        }
    +
    +   /**
    +    * {@link TypeSerializer} for {@link NFA} that uses Java Serialization.
    +    */
    +   public static class Serializer<T> extends TypeSerializer<NFA<T>> {
    +           private static final long serialVersionUID = 1L;
    +
    +           @Override
    +           public boolean isImmutableType() {
    +                   return false;
    +           }
    +
    +           @Override
    +           public TypeSerializer<NFA<T>> duplicate() {
    +                   return this;
    +           }
    +
    +           @Override
    +           public NFA<T> createInstance() {
    +                   return null;
    +           }
    +
    +           @Override
    +           public NFA<T> copy(NFA<T> from) {
    +                   try {
    +                           ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
    +                           ObjectOutputStream oos = new 
ObjectOutputStream(baos);
    +
    +                           oos.writeObject(from);
    +
    +                           oos.close();
    +                           baos.close();
    +
    +                           byte[] data = baos.toByteArray();
    +
    +                           ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
    +                           ObjectInputStream ois = new 
ObjectInputStream(bais);
    +
    +                           @SuppressWarnings("unchecked")
    +                           NFA<T> copy = (NFA<T>) ois.readObject();
    +                           return copy;
    +                   } catch (IOException|ClassNotFoundException e) {
    +                           throw new RuntimeException("Could not copy 
NFA.", e);
    +                   }
    +           }
    +
    +           @Override
    +           public NFA<T> copy(NFA<T> from, NFA<T> reuse) {
    +                   return copy(from);
    +           }
    +
    +           @Override
    +           public int getLength() {
    +                   return 0;
    +           }
    +
    +           @Override
    +           public void serialize(NFA<T> record, DataOutputView target) 
throws IOException {
    +                   try {
    +                           ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
    +                           ObjectOutputStream oos = new 
ObjectOutputStream(baos);
    +
    +                           oos.writeObject(record);
    +
    +                           oos.close();
    +                           baos.close();
    +
    +                           byte[] data = baos.toByteArray();
    +
    +                           target.writeInt(data.length);
    +                           target.write(data);
    +                   } catch (IOException e) {
    +                           throw new RuntimeException("Could not serialize 
NFA.", e);
    +                   }
    +           }
    +
    +           @Override
    +           public NFA<T> deserialize(DataInputView source) throws 
IOException {
    +                   try {
    +                           int size = source.readInt();
    +
    +                           byte[] data = new byte[size];
    +
    +                           source.readFully(data);
    +
    +                           ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
    +                           ObjectInputStream ois = new 
ObjectInputStream(bais);
    +
    +                           @SuppressWarnings("unchecked")
    +                           NFA<T> copy = (NFA<T>) ois.readObject();
    +                           return copy;
    --- End diff --
    
    Same here with the `DataInputViewStream`.


> Fix Serialization of NFA in AbstractKeyedCEPPatternOperator
> -----------------------------------------------------------
>
>                 Key: FLINK-4149
>                 URL: https://issues.apache.org/jira/browse/FLINK-4149
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.0.0, 1.1.0, 1.0.1, 1.0.2, 1.0.3
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> A job that uses CEP fails upon restore with a {{NullPointerException}} in 
> {{NFA.process()}}. The reason seems to be that field {{computationStates}} is 
> {{null}}. This field is transient and read in a custom {{readObject()}} 
> method.
> In {{AbstractKeyedCEPPatternOperator}} this snipped is used to construct a 
> {{StateDescriptor}} for an {{NFA}} state:
> {code}
> new ValueStateDescriptor<NFA<IN>>(
>     NFA_OPERATOR_STATE_NAME,
>     new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class, 
> getExecutionConfig()),
>     null)
> {code}
> It seems Kryo does not invoke {{readObject}}/{{writeObject}}. We probably 
> need a custom {{TypeSerializer}} for {{NFA}} to solve the problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to