Hi, I'm trying to solve a task with getting data from topic. This topic keeps avro format data.
I wrote next code: public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Schema schema = ReflectData.get().getSchema(User.class); FlinkKafkaConsumer<GenericRecord> userConsumer = new FlinkKafkaConsumer<>( "test_topic", *// First* AvroDeserializationSchema.forGeneric(schema), *// Second* // ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://xxx.xx.xxx.xx:8081"), getConsumerProperties()); DataStream<GenericRecord> userStream = env.addSource(userConsumer).name("UserSource").uid("UserSourceUID"); userStream.print("users"); env.execute(); } So, as I think right, there are two ways to get the result: 1. AvroDeserializationSchema.forGeneric(schema) 2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "http://xxx.xx.xxx.xx:8081") And I use ReflectData.get().getSchema(User.class) to get schema. Please, Flink guru, tell me if I am on the right way or not. If I use First way, there is next error: java.io.EOFException at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:510) at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:150) at org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:82) If I use Second way, there is next error: Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope, expecting cep.model.User, missing required field userId at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308) at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) How can I get the correct result? Sorry, if duplicated: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html Today is third day I'm working with this issue.... ((( -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/