Hi,
We’re using flink 1.3.1, and we’re trying to pass through the
pipeline a POJO object that has a generic field )see details in the
complete example below):
We have the class Foo<SomeKey extends BarKey>, and when sending a
subclass with a specific SomeKey, we get the following exception:
java.lang.RuntimeException: Cannot instantiate class.
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Can not set ….BarKey
field …Foo.someKey to java.lang.Object
at java.lang.reflect.Field.set(Field.java:764)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)
... 10 more
If I understand correctly, for some reason, the deserializer used for
SomeKey returns Object (before filling it), ignoring the fact that
SomeKey extends a BarKey, and then fails when trying to assign it to
the parent class.
What is the correct approach for this situation?
Thanks,
Ido
Complete code example:
*public class BarKey implements Serializable {*
* public List<Long> valueList;*
**
* public BarKey() {*
* }*
**
* public BarKey(long value) {*
* super();*
* valueList = new ArrayList<>();*
* valueList.add(value);*
* }*
**
* @Override*
* public boolean equals(Object o) {*
* if (this == o) {*
* return true;*
* }*
* if (o == null || getClass() != o.getClass()) {*
* return false;*
* }*
* BarKey barKey = (BarKey) o;*
* return Objects.equals(valueList, barKey.valueList);*
* }*
**
* @Override*
* public int hashCode() {*
* return Objects.hash(valueList);*
* }*
*}*
**
**
*public class SomeKey extends BarKey implements Serializable {*
*public Integer banana=1;*
**
*public SomeKey() {*
*}*
**
*public SomeKey(long value) {*
*super(value);*
*}*
*}*
*public class Foo<SomeKey extends BarKey> implements Serializable {*
**
* public Foo() {}*
* public SomeKey someKey;*
**
* public Foo(SomeKey someKey) {*
* this.someKey = someKey;*
* }*
**
*}*
**
*public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey>
implements Serializable {*
* public FooFoo() {*
* }*
**
* public Integer grill = 12;*
* public FooFoo(SomeKey someKey) {*
* super(someKey);*
* }*
**
*}*
class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>>
implements Serializable {
@Override
public void processElement(Integer value, Context ctx,
Collector<Foo<BarKey>> out) throws Exception {
out.collect(new FooFoo<>(new SomeKey((long) value)));
}
}
class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>>
implements Serializable {
@Override
public void processElement(Foo<BarKey> value, Context ctx,
Collector<Foo<BarKey>> out) throws Exception {
value.someKey.valueList.add(1L);
out.collect(value);
}
}
class FooBarSelector<SomeKey extends BarKey> implements
KeySelector<Foo<SomeKey>, BarKey>, Serializable {
@Override
public BarKey getKey(Foo<SomeKey> value) throws Exception {
return value.someKey;
}
}
class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {
private static final Logger logger =
LoggerFactory.getLogger(FooBarSink.class);
public long dosomething = 0;
@Override
public void invoke(Foo<BarKey> value) throws Exception {
dosomething += value.someKey.valueList.size();
logger.warn("Sink {}", dosomething);
}
}
Test code:
environment.registerType(FooFoo.class); // Not certain if this is needed
List<Integer> intlist = new ArrayList<>();
intlist.add(3);
intlist.add(5);
DataStreamSource<Integer> streamSource =
environment.fromCollection(intlist);
streamSource.process(new MakeFoo())
.keyBy(new FooBarSelector<>())
.process(new FooProcessor())
.addSink(new FooBarSink());
environment.execute(“Jobname-UT");