Hi,
I'm trying to solve a task with getting data from topic. This topic keeps
avro format data.
I wrote next code:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
Schema schema = ReflectData.get().getSchema(User.class);
FlinkKafkaConsumer<GenericRecord> userConsumer = new
FlinkKafkaConsumer<>(
"test_topic",
*// First*
AvroDeserializationSchema.forGeneric(schema),
*// Second*
//
ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
"http://xxx.xx.xxx.xx:8081"),
getConsumerProperties());
DataStream<GenericRecord> userStream =
env.addSource(userConsumer).name("UserSource").uid("UserSourceUID");
userStream.print("users");
env.execute();
}
So, as I think right, there are two ways to get the result:
1. AvroDeserializationSchema.forGeneric(schema)
2. ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
"http://xxx.xx.xxx.xx:8081")
And I use ReflectData.get().getSchema(User.class) to get schema.
Please, Flink guru, tell me if I am on the right way or not.
If I use First way, there is next error:
java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:510)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:150)
at
org.apache.avro.io.ValidatingDecoder.readInt(ValidatingDecoder.java:82)
If I use Second way, there is next error:
Caused by: org.apache.avro.AvroTypeException: Found user_visit.Envelope,
expecting cep.model.User, missing required field userId
at
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
How can I get the correct result?
Sorry, if duplicated:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DataStream-lt-GenericRecord-gt-from-kafka-topic-td42607.html
Today is third day I'm working with this issue.... (((
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/