Hi, it seems as if the data is written with a confluent registry in mind, so you cannot use option 1: the kafka record is invalid avro as it contains a 5 byte prefix that identifies the schema.
So the second way, is the way to go and it actually works well: it tells you that you have read with a schema that is mismatching the data. Once you use the correct schema (user_visit.Envelope), it will work. On Wed, Mar 31, 2021 at 1:46 PM Matthias Pohl <[email protected]> wrote: > Hi Maminspapin again, > have you checked whether your topic actually contains data that matches > your schema specified through cep.model.User? > > Best, > Matthias > > On Tue, Mar 30, 2021 at 3:39 PM Maminspapin <[email protected]> wrote: > >> Hi, >> >> I'm trying to solve a task with getting data from topic. This topic keeps >> avro format data. >> >> I wrote next code: >> >> public static void main(String[] args) throws Exception { >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> Schema schema = ReflectData.get().getSchema(User.class); >> FlinkKafkaConsumer<GenericRecord> userConsumer = new >> FlinkKafkaConsumer<>( >> "test_topic", >> *// First* >> AvroDeserializationSchema.forGeneric(schema), >> *// Second* >> // >> ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, >> "http://xxx.xx.xxx.xx:8081"), >> getConsumerProperties()); >> >> DataStream<GenericRecord> userStream = >> env.addSource(userConsumer).name("UserSource").uid("UserSourceUID"); >> userStream.print("users"); >> >> env.execute(); >> } >> >> So, as I think right, there are two ways to get the result: >> 1. AvroDeserializationSchema.forGeneric(schema) >> 2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, >> "http://xxx.xx.xxx.xx:8081") >> >> And I use ReflectData.get().getSchema(User.class) to get schema. >> >> >> Please, Flink guru, tell me if I am on the right way or not. >> >> >> If I use First way, there is next error: >> >> java.io.EOFException >> at org.apache.avro.io >> .BinaryDecoder.ensureBounds(BinaryDecoder.java:510) >> at org.apache.avro.io >> .BinaryDecoder.readInt(BinaryDecoder.java:150) >> at org.apache.avro.io >> .ValidatingDecoder.readInt(ValidatingDecoder.java:82) >> >> If I use Second way, there is next error: >> >> Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope, >> expecting cep.model.User, missing required field userId >> at org.apache.avro.io >> .ResolvingDecoder.doAction(ResolvingDecoder.java:308) >> at org.apache.avro.io.parsing.Parser.advance(Parser.java:86) >> >> How can I get the correct result? >> >> Sorry, if duplicated: >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html >> >> Today is third day I'm working with this issue.... ((( >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > >
