Hi Yohei,
In general, this code is correct but it requires additional casting and
extracting the result of Kafka.read() into local variable, like this:
KafkaIO.Read read = KafkaIO.<Long,
MyClass>read().withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder((Class) KafkaAvroDeserializer.class,
AvroCoder.of(MyClass.class));
p.apply(read);
The whole code example is here:
https://github.com/aromanenko-dev/beam-issues/blob/master/io/src/main/java/org/apache/beam/issues/io/KafkaAvro.java
<https://github.com/aromanenko-dev/beam-issues/blob/master/io/src/main/java/org/apache/beam/issues/io/KafkaAvro.java>
Another option is to extend KafkaAvroDeserializer and override your own
deserialize() method which will return proper type (MyClass) instead of default
Object.
> On 30 Apr 2019, at 08:18, Yohei Onishi <[email protected]> wrote:
>
> Hi,
>
> I am currently developing an application that consumes Avro format message
> from Kafka. I am facing the following error. Can anybody help me fix it?
>
> My code looks like this. I am trying to deserialize Avro format message to
> MyClass (POJO) using KafkaAvroDeserializer
>
> public class MyClassConsumer {
> public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> PCollection<KV<Long, MyClass>> input = p.apply(KafkaIO.<Long,
> Object>read()
> .withKeyDeserializer(LongDeserializer.class)
> .withValueDeserializerAndCoder(KafkaAvroDeserializer.class,
> AvroCoder.of(MyClass.class))
> );
> p.run();
> }
> }
> @DefaultCoder(AvroCoder.class)
> public class MyClass{
> String name;
> String age;
> MyClass(){
> }
> MyClass(String n, String a) {
> this.name <http://this.name/>= n;
> this.age= a;
> }
> }
>
> but I got this error.
>
> Compilation failure
> [ERROR]
> /Users/01087872/Documents/fr-det-avro-sample/src/main/java/examples/MyClassConsumer.java:[19,17]
> incompatible types: inference variable T has incompatible equality
> constraints java.lang.Object,examples.MyClass
>
> This is the same issue this stackdriver question mentioned.
> https://stackoverflow.com/questions/54755668/how-to-deserialising-kafka-avro-messages-using-apache-beam#comment98480658_54756442
>
> <https://stackoverflow.com/questions/54755668/how-to-deserialising-kafka-avro-messages-using-apache-beam#comment98480658_54756442>
>
> If you find a solution please let me know.
>
> Yohei Onishi