You need to differentiate two serialization abstractions (which I guess you already know). One is coming from reading the source, where the DeserializationSchema is used, and it translates the bytes of Kafka into something that Flink can handle.
The second serialization occurs within Flink through the TypeSerializer, such that Flink can pass data from one subtask to another subtask. That's why your custom DeserializationSchema would need to provide TypeInformation, which allows Flink to pick the TypeSerializer. Now you would probably not be able to provide a consistent TypeInformation for arbitrary types and Flink has to fall back to Kryo as you said. A solution is to also provide a custom TypeSerializer that uses the Schema registry (I wouldn't go the route of GenericRecords with schema again). Note that because of the missing TypeInformation, you will never be able to use Table API or SQL. If you ask me that's quite a bit of drawbacks coming from that approach (no schema enforcement, no proper schema evolution support, no schema compability enforcement, custom serializers, and clumsy code using lots of string-based field accesses and casts). --- I forgot to highlight another rather simple approach that works on very generic workflows with few operations quite well: use byte[]. So DeserializationSchema works as trivial as it sounds. You pass byte[] all along until you have your FlatMap (assuming you are doing some filtering validation) and only inside this flatmap you deserialize into Avro, do your custom logic, and serialize it again into byte[]. You can use Table API / SQL later on with UDFs that do the same thing. Using byte[] as the internal serialization format of Flink is also blazingly fast (there is not much to do except adding a header). The only downside is that you need to deserialize manually in each operator, but that can usually be factored out. I'd still recommend looking into using only one schema that captures all events as subschemas. On Thu, Nov 12, 2020 at 4:15 PM ashwinkonale <ashwin.kon...@gmail.com> wrote: > So in this case, flink will fall back to default kyro serialiser right ? > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng