Hi, I think you might be able to use AvroTypeInfo which you can use by including the flink-avro dependencies. Is that an option for you?
Best, Aljoscha > On 3. Jan 2018, at 21:34, Kyle Hamlin <hamlin...@gmail.com> wrote: > > Hi, > > It appears that Kryo can't properly extract/deserialize Avro array types. I > have a very simple Avro schema that has an array type and when I remove the > array field the error is not thrown. Is there any way around this without > using a specific type? > > Avro Schema: > { > "type": "record", > "name": "Recieved", > "fields": [ > {"name": "id", "type": "int"}, > {"name": "time", "type": "long"}, > {"name": "urls", "type": {"type": "array", "items": "string"}}, > ] > } > > Deserializer: > import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, > KafkaAvroDeserializer} > import org.apache.avro.generic.{GenericData, GenericRecord} > import org.apache.flink.api.common.typeinfo.TypeInformation > import org.apache.flink.api.java.typeutils.TypeExtractor > import > org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema > > import scala.collection.JavaConverters._ > import scala.reflect.ClassTag > > class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: > String) extends KeyedDeserializationSchema[T] { > > @transient lazy val keyDeserializer: KafkaAvroDeserializer = { > val deserializer = new KafkaAvroDeserializer() > deserializer.configure( > Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> > schemaRegistryUrl).asJava, > true) > deserializer > } > > // Flink needs the serializer to be serializable => this "@transient lazy > val" does the trick > @transient lazy val valueDeserializer: KafkaAvroDeserializer = { > val deserializer = new KafkaAvroDeserializer() > deserializer.configure( > // other schema-registry configuration parameters can be passed, see > the configure() code > // for details (among other things, schema cache size) > Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> > schemaRegistryUrl).asJava, > false) > deserializer > } > > override def deserialize(messageKey: Array[Byte], message: Array[Byte], > topic: String, partition: Int, offset: Long): T = { > valueDeserializer.deserialize(topic, message).asInstanceOf[T] > } > > override def isEndOfStream(nextElement: T): Boolean = false > > override def getProducedType: TypeInformation[T] = { > > TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) > } > > } > Stacktrace: > Cluster configuration: Standalone cluster with JobManager at > localhost/127.0.0.1:6123 <http://127.0.0.1:6123/> > Using address localhost:6123 to connect to JobManager. > JobManager web interface address http://localhost:8082 > <http://localhost:8082/> > Starting execution of program > Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for job > completion. > Connected to JobManager at > Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with > leader session id 00000000-0000-0000-0000-000000000000. > 01/03/2018 15:19:57 Job execution switched to status RUNNING. > 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to > SCHEDULED > 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to > DEPLOYING > 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING > 01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED > com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException > Serialization trace: > values (org.apache.avro.generic.GenericData$Record) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) > at > com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) > ... 20 more > > 01/03/2018 15:19:59 Job execution switched to status FAILING.