[ https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15747820#comment-15747820 ]
Alexander Chermenin commented on FLINK-5319: -------------------------------------------- It seems there is a Java bug. I used next piece of code and I've got the same result: {code}package org.sample.flink.examples.mappers; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; class Task<IN, OUT> { interface Mapper<IN, OUT> extends Serializable { OUT map(IN value); } private byte[] bytes; private IN input; private Task(Mapper<IN, OUT> mapper, IN input) { this.bytes = serializeObject(mapper); this.input = input; } public static void main(String[] args) { Task<Long, Double> longTask = new Task<>(Number::doubleValue, 1L); Task<Integer, Double> intTask = new Task<>(Number::doubleValue, 1); System.out.println(longTask.exec()); System.out.println(intTask.exec()); } private static Object deserializeObject(byte[] bytes) { try (ObjectInputStream oois = new ObjectInputStream(new ByteArrayInputStream(bytes))) { return oois.readObject(); } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); return null; } } private static byte[] serializeObject(Object o) { try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos)) { oos.writeObject(o); oos.flush(); return baos.toByteArray(); } catch (IOException e) { e.printStackTrace(); return new byte[0]; } } @SuppressWarnings("unchecked") private OUT exec() { Mapper mapper = (Mapper) deserializeObject(bytes); return (OUT) mapper.map(input); } }{code} Exception: {code}Exception in thread "main" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long at org.sample.flink.examples.mappers.Task.exec(Task.java:55) at org.sample.flink.examples.mappers.Task.main(Task.java:28){code} > ClassCastException when reusing an inherited method reference as KeySelector > for different classes > -------------------------------------------------------------------------------------------------- > > Key: FLINK-5319 > URL: https://issues.apache.org/jira/browse/FLINK-5319 > Project: Flink > Issue Type: Bug > Components: Core > Affects Versions: 1.2.0 > Reporter: Alexander Chermenin > Assignee: Timo Walther > > Code sample: > {code}static abstract class A { > int id; > A(int id) {this.id = id; } > int getId() { return id; } > } > static class B extends A { B(int id) { super(id % 3); } } > static class C extends A { C(int id) { super(id % 2); } } > private static B b(int id) { return new B(id); } > private static C c(int id) { return new C(id); } > /** > * Main method. > */ > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new); > C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new); > DataStreamSource<B> bStream = environment.fromElements(bs); > DataStreamSource<C> cStream = environment.fromElements(cs); > bStream.keyBy((KeySelector<B, Integer>) A::getId).print(); > cStream.keyBy((KeySelector<C, Integer>) A::getId).print(); > environment.execute(); > } > {code} > This code throws next exception: > {code}Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84) > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:72) > ... 11 more > Caused by: java.lang.ClassCastException: org.sample.flink.examples.Test$C > cannot be cast to org.sample.flink.examples.Test$B > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59) > ... 15 more{code} > This problem occurs when we use method reference as KeySelector. And there > are no problems when we use anonymous class or lambda. -- This message was sent by Atlassian JIRA (v6.3.4#6332)