Hi, Below is my pipeline:
KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql ---------------> KafkaSink(KafkaIO.write) The avro schema of the topic has a field of logical type timestamp-millis. KafkaIO.read transform is creating a KafkaRecord<String,GenericRecord>, where this field is being converted to joda-time. In my Pardo transform, I am trying to use the AvroUtils class methods to convert the generic record to Beam Row and getting below class cast exception for the joda-time attribute. AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema) Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot be cast to java.lang.Long at org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664) at org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217) I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073 for this *Thanks & Regards,* *Vishwas *