Hi Debasish,

It would be a bit tedious, but in order to override the default
AvroSerializer you could specify a TypeInformation object where needed.
You would need to implement your own MyAvroTypeInfo instead of the provided
AvroTypeInfo.

For example:

env.addSource(kafkaConsumer)
        .returns(new MyAvroTypeInfo<>(SomeAvro.class)

This should override the default AvroSerializer.

Or when using some operator:

.window(EventTimeSessionWindows.withGap(joinGap))
.apply(new JoinFunction(), new MyAvroTypeInfo<>(SomeAvro.class));


Another benefit of this approach over the Kryo serializer option is that
you would support state migration.

Hope this helps,
Rafi


On Tue, May 14, 2019 at 10:23 AM Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hi Konstantin -
>
> I did take a look at the option you mentioned. Using that option I can
> register a custom serializer for a custom type. But my requirement is a bit
> different - I would like to have a custom AvroSerializer for *all* types
> which implement SpecificRecordBase of Avro. The reason is I would like an
> avro serializer that bypasses the problems mentioned in
> https://issues.apache.org/jira/browse/FLINK-12501. I have such a
> serializer (it's not Kryo serializer though) and I would like to use it in
> place of AvroSerializer.
>
> regards.
>
> On Tue, May 14, 2019 at 12:38 PM Konstantin Knauf <
> konstan...@ververica.com> wrote:
>
>> Hi Debasish,
>>
>> this should be possible via
>>
>> env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class,
>> MyCustomSerializer.class);
>>
>> You can check that the correct serializer is used with
>>
>> TypeInformation.of(MyCustomType.class).createSerializer(env.getConfig());
>>
>> In this case your serializer needs to implements Kryo's serializer 
>> interface. Alternatively, you can have a look at @TypeInfo Annotation [1].
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/custom_serializers.html
>>
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/api/common/typeinfo/TypeInfo.html
>>
>>
>> On Mon, May 13, 2019 at 6:50 PM Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> Hello -
>>>
>>> I am using Avro based encoding with Flink. I see that Flink has an
>>> AvroSerializer that gets used for serializing Avro. Is it possible to
>>> provide a custom implementation of the serializer e.g. I want to use
>>> MyAvroSerializer instead of AvroSerializer in *all* places. Is there any
>>> way to register such a custom serializer ?
>>>
>>> regards.
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>>
>> Planned Absences: 20. - 21.06.2019
>>
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to