Hi,

We're using flink 1.3.1, and we're trying to pass through the pipeline a POJO 
object that has a generic field )see details in the complete example below):
We have the class Foo<SomeKey extends BarKey>, and when sending a subclass with 
a specific SomeKey, we get the following exception:


java.lang.RuntimeException: Cannot instantiate class.
              at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)
              at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)
              at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)
              at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
              at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
              at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
              at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
              at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
              at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
              at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Can not set ....BarKey field 
...Foo.someKey to java.lang.Object
              at java.lang.reflect.Field.set(Field.java:764)
              at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
              at 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)
              ... 10 more


If I understand correctly, for some reason, the deserializer used for SomeKey 
returns Object (before filling it), ignoring the fact that SomeKey extends a 
BarKey, and then fails when trying to assign it to the parent class.
What is the correct approach for this situation?

Thanks,
Ido



Complete code example:
public class BarKey implements Serializable {
    public List<Long> valueList;

    public BarKey() {
    }

    public BarKey(long value) {
        super();
        valueList = new ArrayList<>();
        valueList.add(value);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        BarKey barKey = (BarKey) o;
        return Objects.equals(valueList, barKey.valueList);
    }

    @Override
    public int hashCode() {
        return Objects.hash(valueList);
    }
}


public class SomeKey extends BarKey implements Serializable {
    public Integer banana=1;

    public SomeKey() {
    }

    public SomeKey(long value) {
        super(value);
    }
}


public class Foo<SomeKey extends BarKey> implements Serializable {

    public Foo() {}
    public SomeKey someKey;

    public Foo(SomeKey someKey) {
        this.someKey = someKey;
    }

}

public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> implements 
Serializable {
    public FooFoo() {
    }

    public Integer grill = 12;
    public FooFoo(SomeKey someKey) {
        super(someKey);
    }

}
class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> implements 
Serializable {
    @Override
    public void processElement(Integer value, Context ctx, 
Collector<Foo<BarKey>> out) throws Exception {
        out.collect(new FooFoo<>(new SomeKey((long) value)));
    }
}


class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>> implements 
Serializable {
    @Override
    public void processElement(Foo<BarKey> value, Context ctx, 
Collector<Foo<BarKey>> out) throws Exception {
        value.someKey.valueList.add(1L);
        out.collect(value);
    }
}

class FooBarSelector<SomeKey extends BarKey> implements 
KeySelector<Foo<SomeKey>, BarKey>, Serializable {
    @Override
    public BarKey getKey(Foo<SomeKey> value) throws Exception {
        return value.someKey;
    }
}

class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {
    private static final Logger logger = 
LoggerFactory.getLogger(FooBarSink.class);
    public long dosomething = 0;

    @Override
    public void invoke(Foo<BarKey> value) throws Exception {
        dosomething += value.someKey.valueList.size();
        logger.warn("Sink {}", dosomething);
    }
}


Test code:

        environment.registerType(FooFoo.class); // Not certain if this is needed

        List<Integer> intlist = new ArrayList<>();
        intlist.add(3);
        intlist.add(5);

        DataStreamSource<Integer> streamSource = 
environment.fromCollection(intlist);

        streamSource.process(new MakeFoo())
             .keyBy(new FooBarSelector<>())
             .process(new FooProcessor())
             .addSink(new FooBarSink());

        environment.execute("Jobname-UT");

Reply via email to