Yes, there is some magic in the KryoSerializer and other serialisers that detect whether the flink-avro dependency is there and then use special TypeSerializers from there.
(Specifically, this is AvroUtils which has a default implementation that doesn't do much and a special implementation called AvroKryoSerializerUtils that is available in flink-avro and which is dynamically loaded.) > On 5. Jan 2018, at 18:53, Kyle Hamlin <hamlin...@gmail.com> wrote: > > So I just added the dependency but didn't change the getProducedType method > and it worked fine. Would you expect that to be the case? > > On Fri, Jan 5, 2018 at 5:43 PM Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Yes, that should do the trick. > > >> On 5. Jan 2018, at 18:37, Kyle Hamlin <hamlin...@gmail.com >> <mailto:hamlin...@gmail.com>> wrote: >> >> I can add that dependency. So I would replace >> >> override def getProducedType: TypeInformation[T] = { >> TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) >> } >> >> with something like: >> >> override def getProducedType: TypeInformation[T] = { >> new AvroTypeInfo(classOf[T]) >> } >> >> On Thu, Jan 4, 2018 at 11:08 AM Aljoscha Krettek <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> Hi, >> >> I think you might be able to use AvroTypeInfo which you can use by including >> the flink-avro dependencies. Is that an option for you? >> >> Best, >> Aljoscha >> >> >>> On 3. Jan 2018, at 21:34, Kyle Hamlin <hamlin...@gmail.com >>> <mailto:hamlin...@gmail.com>> wrote: >>> >>> Hi, >>> >>> It appears that Kryo can't properly extract/deserialize Avro array types. I >>> have a very simple Avro schema that has an array type and when I remove the >>> array field the error is not thrown. Is there any way around this without >>> using a specific type? >>> >>> Avro Schema: >>> { >>> "type": "record", >>> "name": "Recieved", >>> "fields": [ >>> {"name": "id", "type": "int"}, >>> {"name": "time", "type": "long"}, >>> {"name": "urls", "type": {"type": "array", "items": "string"}}, >>> ] >>> } >>> >>> Deserializer: >>> import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, >>> KafkaAvroDeserializer} >>> import org.apache.avro.generic.{GenericData, GenericRecord} >>> import org.apache.flink.api.common.typeinfo.TypeInformation >>> import org.apache.flink.api.java.typeutils.TypeExtractor >>> import >>> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema >>> >>> import scala.collection.JavaConverters._ >>> import scala.reflect.ClassTag >>> >>> class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: >>> String) extends KeyedDeserializationSchema[T] { >>> >>> @transient lazy val keyDeserializer: KafkaAvroDeserializer = { >>> val deserializer = new KafkaAvroDeserializer() >>> deserializer.configure( >>> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> >>> schemaRegistryUrl).asJava, >>> true) >>> deserializer >>> } >>> >>> // Flink needs the serializer to be serializable => this "@transient lazy >>> val" does the trick >>> @transient lazy val valueDeserializer: KafkaAvroDeserializer = { >>> val deserializer = new KafkaAvroDeserializer() >>> deserializer.configure( >>> // other schema-registry configuration parameters can be passed, see >>> the configure() code >>> // for details (among other things, schema cache size) >>> Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> >>> schemaRegistryUrl).asJava, >>> false) >>> deserializer >>> } >>> >>> override def deserialize(messageKey: Array[Byte], message: Array[Byte], >>> topic: String, partition: Int, offset: Long): T >>> = { >>> valueDeserializer.deserialize(topic, message).asInstanceOf[T] >>> } >>> >>> override def isEndOfStream(nextElement: T): Boolean = false >>> >>> override def getProducedType: TypeInformation[T] = { >>> >>> TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) >>> } >>> >>> } >>> Stacktrace: >>> Cluster configuration: Standalone cluster with JobManager at >>> localhost/127.0.0.1:6123 <http://127.0.0.1:6123/> >>> Using address localhost:6123 to connect to JobManager. >>> JobManager web interface address http://localhost:8082 >>> <http://localhost:8082/> >>> Starting execution of program >>> Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for >>> job completion. >>> Connected to JobManager at >>> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259 <>] with >>> leader session id 00000000-0000-0000-0000-000000000000. >>> 01/03/2018 15:19:57 Job execution switched to status RUNNING. >>> 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to >>> SCHEDULED >>> 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to >>> DEPLOYING >>> 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING >>> 01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED >>> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException >>> Serialization trace: >>> values (org.apache.avro.generic.GenericData$Record) >>> at >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >>> at >>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) >>> at >>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355) >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85) >>> at >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152) >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: java.lang.NullPointerException >>> at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277) >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>> at >>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378) >>> at >>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289) >>> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >>> at >>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >>> ... 20 more >>> >>> 01/03/2018 15:19:59 Job execution switched to status FAILING. >> >