I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from GenericRecord, and tries to cast objects based on their types (and cast(object) to long for "timestamp-millis"). see [1].
So in order to use `AvroUtils.toBeamRowStrict`, the generated GenericRecord should have long for "timestamp-millis". The schema you pasted looks right. Not sure why generated class is Joda time (is it controlled by some flags?). But at least you could write a small function to do schema conversion for your need. [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672 Rui On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm <bmvish...@gmail.com> wrote: > Hi Rui, > > I agree that by converting it to long, there will be no error. > But the KafkaIO is giving a GenericRecord with attribute of type JodaTime. > Now I convert it to long. Then in the AvroUtils.toBeamRowStrict again > converts it to JodaTime. > > I used the avro tools 1.8.2 jar, for the below schema and I see that the > generated class has a JodaTime attribute. > > { > "name": "timeOfRelease", > "type": > { > "type": "long", > "logicalType": "timestamp-millis", > "connect.version": 1, > "connect.name": > "org.apache.kafka.connect.data.Timestamp" > } > } > > *Attribute type in generated class:* > private org.joda.time.DateTime timeOfRelease; > > > So not sure why this type casting is required. > > > *Thanks & Regards,* > > *Vishwas * > > > On Tue, Apr 16, 2019 at 12:56 AM Rui Wang <ruw...@google.com> wrote: > >> Read from the code and seems like as the logical type "timestamp-millis" >> means, it's expecting millis in Long as values under this logical type. >> >> So if you can convert joda-time to millis before calling >> "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception >> will gone. >> >> -Rui >> >> >> On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik <lc...@google.com> wrote: >> >>> +dev <d...@beam.apache.org> >>> >>> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm <bmvish...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Below is my pipeline: >>>> >>>> KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql >>>> ---------------> KafkaSink(KafkaIO.write) >>>> >>>> >>>> The avro schema of the topic has a field of logical type >>>> timestamp-millis. KafkaIO.read transform is creating a >>>> KafkaRecord<String,GenericRecord>, where this field is being converted to >>>> joda-time. >>>> >>>> In my Pardo transform, I am trying to use the AvroUtils class methods >>>> to convert the generic record to Beam Row and getting below class cast >>>> exception for the joda-time attribute. >>>> >>>> AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema) >>>> >>>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot >>>> be cast to java.lang.Long >>>> at >>>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664) >>>> at >>>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217) >>>> >>>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073 >>>> for this >>>> >>>> >>>> >>>> *Thanks & Regards,* >>>> >>>> *Vishwas * >>>> >>>>