Hi Debasish, It would be a bit tedious, but in order to override the default AvroSerializer you could specify a TypeInformation object where needed. You would need to implement your own MyAvroTypeInfo instead of the provided AvroTypeInfo.
For example: env.addSource(kafkaConsumer) .returns(new MyAvroTypeInfo<>(SomeAvro.class) This should override the default AvroSerializer. Or when using some operator: .window(EventTimeSessionWindows.withGap(joinGap)) .apply(new JoinFunction(), new MyAvroTypeInfo<>(SomeAvro.class)); Another benefit of this approach over the Kryo serializer option is that you would support state migration. Hope this helps, Rafi On Tue, May 14, 2019 at 10:23 AM Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hi Konstantin - > > I did take a look at the option you mentioned. Using that option I can > register a custom serializer for a custom type. But my requirement is a bit > different - I would like to have a custom AvroSerializer for *all* types > which implement SpecificRecordBase of Avro. The reason is I would like an > avro serializer that bypasses the problems mentioned in > https://issues.apache.org/jira/browse/FLINK-12501. I have such a > serializer (it's not Kryo serializer though) and I would like to use it in > place of AvroSerializer. > > regards. > > On Tue, May 14, 2019 at 12:38 PM Konstantin Knauf < > konstan...@ververica.com> wrote: > >> Hi Debasish, >> >> this should be possible via >> >> env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, >> MyCustomSerializer.class); >> >> You can check that the correct serializer is used with >> >> TypeInformation.of(MyCustomType.class).createSerializer(env.getConfig()); >> >> In this case your serializer needs to implements Kryo's serializer >> interface. Alternatively, you can have a look at @TypeInfo Annotation [1]. >> >> Cheers, >> >> Konstantin >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/custom_serializers.html >> >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/api/common/typeinfo/TypeInfo.html >> >> >> On Mon, May 13, 2019 at 6:50 PM Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> Hello - >>> >>> I am using Avro based encoding with Flink. I see that Flink has an >>> AvroSerializer that gets used for serializing Avro. Is it possible to >>> provide a custom implementation of the serializer e.g. I want to use >>> MyAvroSerializer instead of AvroSerializer in *all* places. Is there any >>> way to register such a custom serializer ? >>> >>> regards. >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> >> >> -- >> >> Konstantin Knauf | Solutions Architect >> >> +49 160 91394525 >> >> >> Planned Absences: 20. - 21.06.2019 >> >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Data Artisans GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >> > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >