Yes, there is some magic in the KryoSerializer and other serialisers that 
detect whether the flink-avro dependency is there and then use special 
TypeSerializers from there.

(Specifically, this is AvroUtils which has a default implementation that 
doesn't do much and a special implementation called AvroKryoSerializerUtils 
that is available in flink-avro and which is dynamically loaded.)

> On 5. Jan 2018, at 18:53, Kyle Hamlin <hamlin...@gmail.com> wrote:
> 
> So I just added the dependency but didn't change the getProducedType method 
> and it worked fine. Would you expect that to be the case?
> 
> On Fri, Jan 5, 2018 at 5:43 PM Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> Yes, that should do the trick.
> 
> 
>> On 5. Jan 2018, at 18:37, Kyle Hamlin <hamlin...@gmail.com 
>> <mailto:hamlin...@gmail.com>> wrote:
>> 
>> I can add that dependency. So I would replace
>> 
>> override def getProducedType: TypeInformation[T] = {
>> TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
>> }
>> 
>> with something like:
>> 
>> override def getProducedType: TypeInformation[T] = {
>>     new AvroTypeInfo(classOf[T])
>> }
>> 
>> On Thu, Jan 4, 2018 at 11:08 AM Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> Hi,
>> 
>> I think you might be able to use AvroTypeInfo which you can use by including 
>> the flink-avro dependencies. Is that an option for you?
>> 
>> Best,
>> Aljoscha
>> 
>> 
>>> On 3. Jan 2018, at 21:34, Kyle Hamlin <hamlin...@gmail.com 
>>> <mailto:hamlin...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> It appears that Kryo can't properly extract/deserialize Avro array types. I 
>>> have a very simple Avro schema that has an array type and when I remove the 
>>> array field the error is not thrown. Is there any way around this without 
>>> using a specific type?
>>> 
>>> Avro Schema:
>>> {
>>>     "type": "record",
>>>     "name": "Recieved",
>>>     "fields": [
>>>         {"name": "id", "type": "int"},
>>>         {"name": "time", "type": "long"},
>>>         {"name": "urls", "type": {"type": "array", "items": "string"}},
>>>     ]
>>> }
>>> 
>>> Deserializer:
>>> import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, 
>>> KafkaAvroDeserializer}
>>> import org.apache.avro.generic.{GenericData, GenericRecord}
>>> import org.apache.flink.api.common.typeinfo.TypeInformation
>>> import org.apache.flink.api.java.typeutils.TypeExtractor
>>> import 
>>> org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
>>> 
>>> import scala.collection.JavaConverters._
>>> import scala.reflect.ClassTag
>>> 
>>> class AvroDeserializer[T <: GenericRecord : ClassTag](schemaRegistryUrl: 
>>> String) extends KeyedDeserializationSchema[T] {
>>> 
>>>   @transient lazy val keyDeserializer: KafkaAvroDeserializer = {
>>>     val deserializer = new KafkaAvroDeserializer()
>>>     deserializer.configure(
>>>       Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
>>> schemaRegistryUrl).asJava,
>>>       true)
>>>     deserializer
>>>   }
>>> 
>>>   // Flink needs the serializer to be serializable => this "@transient lazy 
>>> val" does the trick
>>>   @transient lazy val valueDeserializer: KafkaAvroDeserializer = {
>>>     val deserializer = new KafkaAvroDeserializer()
>>>     deserializer.configure(
>>>       // other schema-registry configuration parameters can be passed, see 
>>> the configure() code
>>>       // for details (among other things, schema cache size)
>>>       Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> 
>>> schemaRegistryUrl).asJava,
>>>       false)
>>>     deserializer
>>>   }
>>> 
>>>   override def deserialize(messageKey: Array[Byte], message: Array[Byte],
>>>                            topic: String, partition: Int, offset: Long): T 
>>> = {
>>>     valueDeserializer.deserialize(topic, message).asInstanceOf[T]
>>>   }
>>> 
>>>   override def isEndOfStream(nextElement: T): Boolean = false
>>> 
>>>   override def getProducedType: TypeInformation[T] = {
>>>     
>>> TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
>>>   }
>>> 
>>> }
>>> Stacktrace:
>>> Cluster configuration: Standalone cluster with JobManager at 
>>> localhost/127.0.0.1:6123 <http://127.0.0.1:6123/>
>>> Using address localhost:6123 to connect to JobManager.
>>> JobManager web interface address http://localhost:8082 
>>> <http://localhost:8082/>
>>> Starting execution of program
>>> Submitting job with JobID: d9ed8f58fceaae253b84fc86e4b6fa3a. Waiting for 
>>> job completion.
>>> Connected to JobManager at 
>>> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259 <>] with 
>>> leader session id 00000000-0000-0000-0000-000000000000.
>>> 01/03/2018 15:19:57 Job execution switched to status RUNNING.
>>> 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to 
>>> SCHEDULED
>>> 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to 
>>> DEPLOYING
>>> 01/03/2018 15:19:57 Source: Kafka -> Sink: Unnamed(1/1) switched to RUNNING
>>> 01/03/2018 15:19:59 Source: Kafka -> Sink: Unnamed(1/1) switched to FAILED
>>> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
>>> Serialization trace:
>>> values (org.apache.avro.generic.GenericData$Record)
>>>     at 
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>     at 
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>>     at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
>>>     at 
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
>>>     at 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>>>     at 
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.NullPointerException
>>>     at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
>>>     at 
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>>     at 
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>     at 
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
>>>     at 
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>     at 
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>>     ... 20 more
>>> 
>>> 01/03/2018 15:19:59 Job execution switched to status FAILING.
>> 
> 

Reply via email to