Hi All! We are trying to work with avro serialized data from Kafka using the Table API and use TIMESTAMP column type.
According to the docs <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>, we can use long type with logicalType: timestamp-millis. So we use the following avro field schema in the descriptor: {"name": "timestamp_field", "type": {"type":"long", "logicalType": "timestamp-millis"}} When trying to insert into the table we get the following error: Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long are in module java.base of loader 'bootstrap') at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) It seems like the avro format (serializer) is not aware of the logical type conversion that is needed to convert back to the physical type long. I looked at the AvroTypesITCase which uses all kinds of logical types but I could only find logic that maps between Avro Pojos and tables and none that actually uses the serializaiton/deserialization logic with the format. Could someone please help me with this? Maybe what I am trying to do is not possible, or I just missed a crucial step. Thank you! Gyula