Hi guys, In our flink job we use java source for deserializing a message from kafka using a kafka deserializer. Signature is as below.
public class CustomAvroDeserializationSchema implements KafkaDeserializationSchema<Tuple2<EventMetaData,GenericRecord>> The other parts of the streaming job are in scala. When data has to be serialized I get this exception *java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to scala.Product at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)* Here is how I provide type info for serialization in the java deserialization class: @Override public TypeInformation<Tuple2<EventMetaData, GenericRecord>> getProducedType() { return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new GenericRecordAvroTypeInfo(this .writer)); Here is how I add the kafka source in scala : private[flink] def sourceType( deserialization: KafkaDeserializationSchema[(EventMetaData, GenericRecord)], properties: Properties): FlinkKafkaConsumer[(EventMetaData, GenericRecord)] = { val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)]( source.asJava, deserialization, properties) consumer } Any idea thoughts on how to interoperate between java tuple2 and scala case class ? Also using 1.9.1 version of flink-connector-kafka while the rest of the cluster uses 1.7.2 version of flink. Best, Nick.