Hi, We're using flink 1.3.1, and we're trying to pass through the pipeline a POJO object that has a generic field )see details in the complete example below): We have the class Foo<SomeKey extends BarKey>, and when sending a subclass with a specific SomeKey, we get the following exception:
java.lang.RuntimeException: Cannot instantiate class. at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: Can not set ....BarKey field ...Foo.someKey to java.lang.Object at java.lang.reflect.Field.set(Field.java:764) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197) ... 10 more If I understand correctly, for some reason, the deserializer used for SomeKey returns Object (before filling it), ignoring the fact that SomeKey extends a BarKey, and then fails when trying to assign it to the parent class. What is the correct approach for this situation? Thanks, Ido Complete code example: public class BarKey implements Serializable { public List<Long> valueList; public BarKey() { } public BarKey(long value) { super(); valueList = new ArrayList<>(); valueList.add(value); } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } BarKey barKey = (BarKey) o; return Objects.equals(valueList, barKey.valueList); } @Override public int hashCode() { return Objects.hash(valueList); } } public class SomeKey extends BarKey implements Serializable { public Integer banana=1; public SomeKey() { } public SomeKey(long value) { super(value); } } public class Foo<SomeKey extends BarKey> implements Serializable { public Foo() {} public SomeKey someKey; public Foo(SomeKey someKey) { this.someKey = someKey; } } public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> implements Serializable { public FooFoo() { } public Integer grill = 12; public FooFoo(SomeKey someKey) { super(someKey); } } class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> implements Serializable { @Override public void processElement(Integer value, Context ctx, Collector<Foo<BarKey>> out) throws Exception { out.collect(new FooFoo<>(new SomeKey((long) value))); } } class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>> implements Serializable { @Override public void processElement(Foo<BarKey> value, Context ctx, Collector<Foo<BarKey>> out) throws Exception { value.someKey.valueList.add(1L); out.collect(value); } } class FooBarSelector<SomeKey extends BarKey> implements KeySelector<Foo<SomeKey>, BarKey>, Serializable { @Override public BarKey getKey(Foo<SomeKey> value) throws Exception { return value.someKey; } } class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> { private static final Logger logger = LoggerFactory.getLogger(FooBarSink.class); public long dosomething = 0; @Override public void invoke(Foo<BarKey> value) throws Exception { dosomething += value.someKey.valueList.size(); logger.warn("Sink {}", dosomething); } } Test code: environment.registerType(FooFoo.class); // Not certain if this is needed List<Integer> intlist = new ArrayList<>(); intlist.add(3); intlist.add(5); DataStreamSource<Integer> streamSource = environment.fromCollection(intlist); streamSource.process(new MakeFoo()) .keyBy(new FooBarSelector<>()) .process(new FooProcessor()) .addSink(new FooBarSink()); environment.execute("Jobname-UT");