Hi, Unfortunately I found another mysterious issue. If I compile the generated code using Janino, the readObject method will not triggered after an object of that class is deserialized. In case I am compiling the class using Oracle JDK, the readObject method will be triggered as expected. Did any of you see similar phenomenon? What would be the best way to debug this?
I have attached the generated Java code. Thanks in advance, Gábor On 9 May 2016 at 20:41, Gábor Horváth <xazax....@gmail.com> wrote: > Hi, > > Thank you for your support, I was able to solve this issue :) > > Gábor > > On 9 May 2016 at 12:15, Aljoscha Krettek <aljos...@apache.org> wrote: > >> Hi Gábor, >> I used it, yes, but I never encountered such a problem. Let's hope that >> the >> error message Márton discovered helps. :-) >> >> Cheers, >> Aljoscha >> >> On Mon, 9 May 2016 at 11:38 Márton Balassi <balassi.mar...@gmail.com> >> wrote: >> >> > Hi Gabor, >> > >> > I have checked out your branch and tried debugging WordCountPojo to >> > reproduce the behaviour. I am on a Mac with jdk1.8.0_91. I have received >> > the following error when trying to access the constructors of the class >> in >> > question: >> > >> > Exception in thread "main" java.lang.VerifyError: (class: >> > >> > >> org/apache/flink/api/java/typeutils/runtime/generated/Word_GeneratedSerializer, >> > method: deserialize signature: >> > >> > >> (Lorg/apache/flink/core/memory/DataInputView;)Lorg/apache/flink/examples/java/wordcount/WordCountPojo$Word;) >> > Register 3 contains wrong type >> > at java.lang.Class.getDeclaredConstructors0(Native Method) >> > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) >> > at java.lang.Class.getConstructors(Class.java:1651) >> > at >> > >> > >> org.apache.flink.api.java.typeutils.runtime.PojoSerializerGenerator.createSerializer(PojoSerializerGenerator.java:57) >> > at >> > >> > >> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:306) >> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:407) >> > at org.apache.flink.api.java.DataSet.print(DataSet.java:1605) >> > at >> > >> > >> org.apache.flink.examples.java.wordcount.WordCountPojo.main(WordCountPojo.java:119) >> > Disconnected from the target VM, address: '127.0.0.1:52140', transport: >> > 'socket' >> > >> > I hope this helps. >> > >> > Marton >> > >> > On Sun, May 8, 2016 at 2:59 PM, Gábor Horváth <xazax....@gmail.com> >> wrote: >> > >> > > Hi! >> > > >> > > I have created a proof of concept implementation of my GSoC project >> [1]: >> > > introducing code generation to the serializers. The code is available >> > here >> > > [2]. Unfortunately I have run into a problem that I am unable to >> debug. I >> > > generated some code that I compiled using the Janino compiler [4]. I >> did >> > > not get any exception during the compilation, and I was able to get >> the >> > > Class from the compiler. Unfortunately I am unable to get the list of >> the >> > > constructors from the Class and can not debug what is the source of >> the >> > > problem. There are no exceptions thrown, no errors displayed in the >> event >> > > log, no fatal error logfile generated by the JVM, but the process >> > > terminates (at line [3]). I suspect that the Class generated by the >> > Janino >> > > compiler is invalid, but it does not emit any exceptions or warnings. >> I >> > am >> > > using Oracle JDK 8, on Arch Linux. Have you seen any similar problem? >> Is >> > > there a way to debug a situation like this? Is there a way to get >> extra >> > > diagnostics from the Janino compiler? >> > > >> > > @aljoscha: you have some experience with the Janino compiler, what do >> you >> > > think? >> > > >> > > Thanks in advance, >> > > Gábor >> > > >> > > [1] https://issues.apache.org/jira/browse/FLINK-3599 >> > > [2] https://github.com/Xazax-hun/flink/commits/serializer_codegen >> > > [3] >> > > >> > > >> > >> https://github.com/Xazax-hun/flink/commit/af499d5bebe5c1dba6b970977852318346636a8f#diff-7a2080515bac95cec58032655867d6cfR57 >> > > [4] http://unkrig.de/w/Janino >> > > >> > >> > >
package org.apache.flink.api.java.typeutils.runtime.generated; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import static org.apache.flink.util.Preconditions.checkNotNull; public class Word_GeneratedSerializer extends TypeSerializer<org.apache.flink.examples.java.wordcount.WordCountPojo.Word> { private static byte IS_NULL = 1; private static byte NO_SUBCLASS = 2; private static byte IS_SUBCLASS = 4; private static byte IS_TAGGED_SUBCLASS = 8; private static final long serialVersionUID = 1L; private int numFields; private transient Field[] fields; private ExecutionConfig executionConfig; private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache; private transient ClassLoader cl; Class<org.apache.flink.examples.java.wordcount.WordCountPojo.Word> clazz; final org.apache.flink.api.common.typeutils.base.IntSerializer f0; final org.apache.flink.api.common.typeutils.base.StringSerializer f1; public Word_GeneratedSerializer(Class<org.apache.flink.examples.java.wordcount.WordCountPojo.Word> clazz, TypeSerializer<?>[] serializerFields, Field[] reflectiveFields, ExecutionConfig e) { this.clazz = clazz; executionConfig = e; cl = Thread.currentThread().getContextClassLoader(); this.numFields = reflectiveFields.length; this.fields = reflectiveFields; f0 = (org.apache.flink.api.common.typeutils.base.IntSerializer)serializerFields[0]; f1 = (org.apache.flink.api.common.typeutils.base.StringSerializer)serializerFields[1]; for (int i = 0; i < numFields; i++) { this.fields[i].setAccessible(true); } } public boolean isImmutableType() { return false; } private void writeObject(ObjectOutputStream out) throws IOException { out.defaultWriteObject(); out.writeInt(fields.length); for (Field field: fields) { out.writeObject(field.getDeclaringClass()); out.writeUTF(field.getName()); } } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { cl = Thread.currentThread().getContextClassLoader(); in.defaultReadObject(); int numFields = in.readInt(); fields = new Field[numFields]; for (int i = 0; i < numFields; i++) { Class<?> clazz = (Class<?>)in.readObject(); String fieldName = in.readUTF(); fields[i] = null; while(clazz != null) { try { fields[i] = clazz.getDeclaredField(fieldName); fields[i].setAccessible(true); break; } catch (NoSuchFieldException e) { clazz = clazz.getSuperclass(); } } } } public Word_GeneratedSerializer duplicate() { return this; } public org.apache.flink.examples.java.wordcount.WordCountPojo.Word createInstance() { if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) { return null; } try { org.apache.flink.examples.java.wordcount.WordCountPojo.Word t = (org.apache.flink.examples.java.wordcount.WordCountPojo.Word)clazz.newInstance(); fields[0].set(t, f0.createInstance()); fields[1].set(t, f1.createInstance()); return t; } catch (Exception e) { throw new RuntimeException("Cannot instantiate class.", e); } } public org.apache.flink.examples.java.wordcount.WordCountPojo.Word copy(Object from) { if (from == null) return null; Class<?> actualType = from.getClass(); org.apache.flink.examples.java.wordcount.WordCountPojo.Word target; try { target = (org.apache.flink.examples.java.wordcount.WordCountPojo.Word) from.getClass().newInstance(); } catch (Throwable t) { throw new RuntimeException("Cannot instantiate class.", t); } try { Object value; value = fields[0].get(from); if (value != null) { fields[0].set(target, f0.copy(value)); } else { fields[0].set(target, null); } value = fields[1].get(from); if (value != null) { fields[1].set(target, f1.copy(value)); } else { fields[1].set(target, null); } } catch (IllegalAccessException e) { throw new RuntimeException("Error during POJO copy."); } return target; } public org.apache.flink.examples.java.wordcount.WordCountPojo.Word copy(Object from, Object resuse) { if (from == null) return null; return copy(from); } public int getLength() { return -1; } public void serialize(Object value, DataOutputView target) throws IOException { int flags = 0; if (value == null) { flags |= IS_NULL; target.writeByte(flags); return; } Integer subclassTag = -1; Class<?> actualClass = value.getClass(); TypeSerializer subclassSerializer = null; if (clazz != actualClass) { // TODO } else { flags |= NO_SUBCLASS; } target.writeByte(flags); if ((flags & IS_SUBCLASS) != 0) { target.writeUTF(actualClass.getName()); } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { target.writeByte(subclassTag); } if ((flags & NO_SUBCLASS) != 0) { try { Object o; o = fields[0].get(value); if (o == null) { target.writeBoolean(true); } else { target.writeBoolean(false); f0.serialize(o, target); } o = fields[1].get(value); if (o == null) { target.writeBoolean(true); } else { target.writeBoolean(false); f1.serialize(o, target); } } catch (IllegalAccessException e) { throw new RuntimeException("Error during POJO copy, this should" + "not happen since we check the fields before."); } } else { // TOOD } } public org.apache.flink.examples.java.wordcount.WordCountPojo.Word deserialize(DataInputView source) throws IOException { int flags = source.readByte(); if((flags & IS_NULL) != 0) { return null; } org.apache.flink.examples.java.wordcount.WordCountPojo.Word target = null; if ((flags & IS_SUBCLASS) != 0) { // TODO } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { // TODO } else { target = createInstance(); } if ((flags & NO_SUBCLASS) != 0) { try { boolean isNull; isNull = source.readBoolean(); if (isNull) { fields[0].set(target, null); } else { fields[0].set(target, f0.deserialize(source)); } isNull = source.readBoolean(); if (isNull) { fields[1].set(target, null); } else { fields[1].set(target, f1.deserialize(source)); } } catch (IllegalAccessException e) { throw new RuntimeException("Error during POJO copy, this should not happen" + "since we check the fieldsbefore."); } } else { //TODO } return target; } public org.apache.flink.examples.java.wordcount.WordCountPojo.Word deserialize(Object reuse, DataInputView source) throws IOException { return deserialize(source); } public void copy(DataInputView source, DataOutputView target) throws IOException { int flags = source.readByte(); target.writeByte(flags); if ((flags & IS_NULL) != 0) { return; } if ((flags & IS_SUBCLASS) != 0) { // TODO } else if ((flags & IS_TAGGED_SUBCLASS) != 0) { // TODO } if ((flags & NO_SUBCLASS) != 0) { boolean isNull; isNull = source.readBoolean(); target.writeBoolean(isNull); if (!isNull) { f0.copy(source, target); } isNull = source.readBoolean(); target.writeBoolean(isNull); if (!isNull) { f1.copy(source, target); } } else { // TODO } } public boolean equals(Object obj) { if (obj instanceof Word_GeneratedSerializer) { Word_GeneratedSerializer other = (Word_GeneratedSerializer)obj; return other.canEqual(this) && this.clazz == other.clazz && this.numFields == other.numFields && Objects.equals(this.f0, other.f0) && Objects.equals(this.f1, other.f1) ; } else { return false; } } public boolean canEqual(Object obj) { return obj instanceof Word_GeneratedSerializer; } public int hashCode() { return Objects.hash(clazz, numFields, f0, f1); } }