Hi guys,
In our flink job we use java source for deserializing a message from kafka
using a kafka deserializer. Signature is as below.


public class CustomAvroDeserializationSchema implements
        KafkaDeserializationSchema<Tuple2<EventMetaData,GenericRecord>>

The other parts of the streaming job are in scala. When data has to be
serialized I get this exception




*java.lang.RuntimeException: org.apache.flink.api.java.tuple.Tuple2 cannot
be cast to scala.Product at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)*

Here is how I provide type info for serialization in the java
deserialization class:

@Override
public TypeInformation<Tuple2<EventMetaData, GenericRecord>> getProducedType() {
    return new TupleTypeInfo(TypeInformation.of(EventMetaData.class),new
GenericRecordAvroTypeInfo(this
            .writer));

Here is how I add the kafka source in scala :

private[flink] def sourceType(
  deserialization: KafkaDeserializationSchema[(EventMetaData, GenericRecord)],
  properties: Properties): FlinkKafkaConsumer[(EventMetaData,
GenericRecord)] = {
  val consumer = new FlinkKafkaConsumer[(EventMetaData, GenericRecord)](
    source.asJava,
    deserialization,
    properties)
  consumer
}

Any idea thoughts on how to interoperate between java tuple2 and scala case
class ? Also using 1.9.1 version of flink-connector-kafka while the rest of
the cluster uses 1.7.2 version of flink.

Best,
Nick.

Reply via email to