Hi Niels,
the type handling evolved during the years and is a bit messed up
through the different layers. You are almost right with your last
assumption "Is the provided serialization via TypeInformation 'skipped'
during startup and only used during runtime?". The type extraction
returns a Kryo type and the Kryo type is using the configured default
serializers during runtime. Therefore, the log entry is just an INFO but
not a WARNING. And you did everything correct.
Btw there is also the possiblity to insert a custom type into the type
extration by using Type Factories [0].
Maybe as a side comment: We are aware of these confusions and the Table
& SQL API will hopefully not use the TypeExtractor anymore in 1.10. This
is what I am working on at the moment.
Regards,
Timo
[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/types_serialization.html#defining-type-information-using-a-factory
Am 08.07.19 um 14:17 schrieb Niels Basjes:
Hi,
Context:
I'm looking into making the Google (BigQuery compatible) HyperLogLog++
implementation available in Flink because it is simply an Apache
licensed opensource library
- https://issuetracker.google.com/issues/123269269
- https://issues.apache.org/jira/browse/BEAM-7013
- https://github.com/google/zetasketch
While doing this I noticed that even though I provided an explicit
Kryo Serializer for the core class
i.e. I did
senv.getConfig().registerTypeWithKryoSerializer(HyperLogLogPlusPlus.class,
HLLSerializer.class);
I still see messages like this when registering a new
UserDefinedFunction (AggregateFunction / ScalarFunction) that has this
class as either input of output:
13:59:57,316 [INFO ] TypeExtractor : 1815:
class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
getter for field allowedTypes
13:59:57,317 [INFO ] TypeExtractor : 1818:
class com.google.zetasketch.HyperLogLogPlusPlus does not contain a
setter for field allowedTypes
13:59:57,317 [INFO ] TypeExtractor : 1857:
Class class com.google.zetasketch.HyperLogLogPlusPlus cannot be used
as a POJO type because not all fields are valid POJO fields, and must
be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance.
So it is complaining about the serialization performance when done in
a different way than was configured.
Then I noticed that I see similar messages in other situations too.
In this code
https://github.com/nielsbasjes/yauaa/blob/master/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/DemonstrationOfTumblingTableSQLFunction.java#L165
I see
13:59:58,478 [INFO ] TypeExtractor : 1815:
class org.apache.flink.types.Row does not contain a getter for field
fields
13:59:58,478 [INFO ] TypeExtractor : 1818:
class org.apache.flink.types.Row does not contain a setter for field
fields
13:59:58,479 [INFO ] TypeExtractor : 1857:
Class class org.apache.flink.types.Row cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
even though a full TypeInformation instance for that type was provided
TypeInformation<Row> tupleType = new RowTypeInfo(SQL_TIMESTAMP,
STRING, STRING, STRING, STRING, LONG);
DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, tupleType);
I checked with my debugger and the code IS using for both mentioned
examples the correct serialization classes when running.
So what is happening here?
Did I forget to do a required call?
So is this a bug?
Is the provided serialization via TypeInformation 'skipped' during
startup and only used during runtime?