Sorry for not finishing the last - 4th - question 4) When Kyro fallback is enabled, that means fallback is the last resort if the object can be serialized with any other serializer like `PojoSerializer`? Can we still benefit from the performance of Flink built-in serializer without sacrifying the flexibility of Kyro?
On Wed, Feb 2, 2022 at 11:22 AM Deniz Koçak <lend...@gmail.com> wrote: > > Hi Yoel, > > Thank you for answering my question, really appreciated. However, that > brings my mind more questions. > > 1) As far as I understood from the Flink built-in serialization, if a > class is a POJO type wrt the items defined in POJOs section here [1], > if an object is an instance of a POJO type class, no need to do > anything extra. Serialization can be handled via `PojoSerializer` > naively? In our case with `java.utils.UUID`, that class has no > no-arguments public constructor which cause the fallback to Kyro I > guess? > > 2) In your suggestion `you could provide a supplier of UUIDGenerator`, > I am not sure what exactly you refer here? Something like a lambda > which is capable of creating an instance of `UUIDGenerator` on demand, > which can be called within open() method? > > 3) `java.util.UUID` class also implements `java.io.Serializable`, but > as far as I understood that does not mean a class implementing > `java.io.Serializable` is always be able to be serialized via Flink? > At least with the built-in serializers (no referring Kyro)? UUID class > is java Serializable, though still seems to be a generic type because > does not fall into any of the categories mentioned in [1]? > > 4) When Kyro fallback is enabled, that means fallback is the last > resort if the object can be serialized with > > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/ > > On Wed, Feb 2, 2022 at 6:27 AM Yoel Benharrous > <yoel.benharr...@gmail.com> wrote: > > > > Hi Deniz, > > > > You could declare UUIDGenerator as a transient field and instanciate it in > > the open > > function > > > > Ot if you want to inject any UUIDGenerator you could provide a supplier of > > UUIDGenerator that should implement Serializable and invoke it in the open > > function. > > > > > > > > > > On Tue, Feb 1, 2022, 10:01 PM Deniz Koçak <lend...@gmail.com> wrote: > >> > >> Hi All, > >> > >> We have a function extending `KeyedCoProcessFunction` and within that > >> function implementation. I wanted to keep a class object as a field > >> which is simply responsible for generating a UUID. > >> > >> We disabled Kyro fallback for generic types via > >> `env.getConfig().disableGenericTypes()`. > >> I am receiving the error below when I pass an instance of > >> `UUIDGenerator` to the `KeyedCoProcessFunction` and try to run the > >> job. > >> > >> *********************************************************************************** > >> Exception in thread "main" java.lang.UnsupportedOperationException: > >> Generic types have been disabled in the ExecutionConfig and type > >> java.util.UUID is treated as a generic type. > >> *********************************************************************************** > >> > >> At that point I wonder how can/should I do to pass an instance of > >> `UUIDGenerator` to `PrebetModelRequestProcessor` function. > >> > >> > >> ModelRequestProcessor Class: > >> ======================================================== > >> public class ModelRequestProcessor extends > >> KeyedCoProcessFunction<String, CustomerInformation, CustomerRisk, > >> ModelRequest> { > >> > >> protected final UUIDGenerator uuidGenerator; > >> > >> public PrebetModelRequestProcessor(UUIDGenerator generator) { > >> this.uuidGenerator = generator; > >> } > >> } > >> ======================================================== > >> > >> UUIDGenerator Class: > >> ======================================================== > >> public class UUIDGenerator implements Serializable { > >> > >> private static final long serialVersionUID = 42L; > >> > >> public java.util.UUID generate() { > >> return java.util.UUID.randomUUID(); > >> } > >> } > >> ========================================================