Hi Krzysztof,

Your problems arise due to Java type erasure. If you have DataPoint with 
Map<String, BadPojo>, all Flinks type system will see is a Map, i.e. 
Map<Object, Object>. 

So in the first case, with DataPoint having an explicit member of type 
"BadPojo", Flink will deduce "DataPoint" to be a PojoType with two fields, 
whereas the second field "badPojo" itself is of type GenericType<BadPojo> and 
thus, the second field will be serialized via kryo. 

In the second case, DataPoint will still be a PojoType and your "badPojo" field 
will also be a GenericType, but this time of type "GenericType<java.util.Map>". 
So no complaints about BadPOJO here because the entire map will be serialized 
via kryo already and flink doesn't deduce any further and doesn't see the 
BadPojo here. => No win for you :)  

In the second case, you need to explicitly tell flink that your "badPojo" field 
is a map and should be detected as "MapType" from flink. If Flink detects it as 
MapType, it will again complain about BadPojo itself and you are back to the 
roots and still need to fix the BadPojo to finally avoid kryo. :)  

I wrote myself a small utility function once when I had to tell flink about a 
POJO from an external library that contained a Map in order to serialize it 
efficiently:



    /**
     * Flink has a Types.POJO function where next to a class, one can specify a 
Map with fieldnames and types to be set.
     * This function is kind of a utility. Where Flink Types.POJO function 
creates the POJO type for the class only
     * with the fields specified, this function here creates the POJOType 
normally, but replaces the type of the
     * provided field with the provided type and keeps all other fields as they 
are generated normally.
     * @throws org.apache.flink.api.common.functions.InvalidTypesException
     */
    public static <T, V> TypeInformation<T> 
pojoTypeWithElementReplacement(Class<T> pojo, String replacementFieldName, 
TypeInformation<V> replacementFieldType) {
        final PojoTypeInfo<T> pojoType = (PojoTypeInfo<T>) Types.POJO(pojo);

        final Map<String, TypeInformation<?>> pojoFieldTypes = IntStream
                .range(0, pojoType.getArity())
                .mapToObj(fieldNr -> pojoType.getPojoFieldAt(fieldNr))
                .collect(Collectors.toMap(
                        pojoField -> pojoField.getField().getName(),
                        pojoField -> pojoField.getTypeInformation()
                ));

        final TypeInformation<?> oldTypeForElements = 
pojoFieldTypes.remove(replacementFieldName);
        if (oldTypeForElements == null) {
            throw new 
org.apache.flink.api.common.functions.InvalidTypesException("Expected " + 
replacementFieldName + " field to exist in order to replace it properly with 
custom type infos");
        }
        pojoFieldTypes.put(replacementFieldName, replacementFieldType);

        return Types.POJO(pojo, pojoFieldTypes);
    }


In your case, you could call it like this:

TypeInformation<DataPoint> pojoMapType = 
FlinkTypeHints.pojoTypeWithElementReplacement(DataPoint.class, "badPojo", 
Types.MAP(Types.STRING, TypeExtractor.createTypeInfo(BadPojo.class)));

a bit less verbose if BadPojo would really be a PojoType:

TypeInformation<DataPoint> pojoMapType = 
FlinkTypeHints.pojoTypeWithElementReplacement(DataPoint.class, "badPojo", 
Types.MAP(Types.STRING, Types.POJO(BadPojo.class)));

If the POJO is e.g. returned from a mapFunction, you can write something like 
stream.map(myMapFunctionReturningDataPoint).returns(pojoMapType);



Note that I wrote this for Flink 1.9. I read somewhere that Flink now has a new 
type system somehow, somewhere but I didn't check this out yet and have no idea 
what changed. 

Best regards
Theo




----- Ursprüngliche Mail -----
Von: "KristoffSC" <krzysiek.chmielew...@gmail.com>
An: "user" <user@flink.apache.org>
Gesendet: Montag, 13. Juli 2020 23:06:20
Betreff: Flink Pojo Serialization for Map Values

Hi,
I would like to ask Flink Pojo Serialziation described in [1]

I have a case where my custom event source produces Events described by
Pojo:

public class DataPoint
{
        public long timestamp;
        public double value;
        public BadPojo badPojo = new BadPojo();

        public DataPoint() {}

}

Where BadPojo class is something like this:
public class BadPojo {

    private final String fieldA = "X";
}

So this is case where Flink, using default configuration should fall back to
Kryo, and it does.
In logs I can see entries:
org.apache.flink.api.java.typeutils.TypeExtractor             - class
org.home.streaming.events.BadPojo does not contain a getter for field fieldA

So this is an expected result.

However when I change DataPoint class to use:
public Map<String, BadPojo> badPojo = new HashMap<>();

instead direct BadPojo field no longer see logs complaining about BadPojo
class.

In this case DataPoint class looks like this:
public class DataPoint
{
        public long timestamp;
        public double value;
        public Map<String, BadPojo> badPojo = new HashMap<>();

        public DataPoint() {}

}

My questions:
1. What actually happen here?
2. Which setrializator is used by Flink?
3. How Maps should be handled in Pojo definition to get best Serialization
performance (assuming that I do need access that map).

Thanks,
Krzysztof


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
-- 

Reply via email to