I see. I created this PR [1] to ask feedback from the reviewer who knows better on Avro in Beam.
-Rui [1]: https://github.com/apache/beam/pull/8376 On Sun, Apr 21, 2019 at 11:19 PM Vishwas Bm <bmvish...@gmail.com> wrote: > Hi Rui, > > I checked the AvroUtils code. There is a static intializer block basically > it registers Avro Timestamp Conversion functions for logical type > timestamp-millis. > > *// Code Snippet below* > static { > // This works around a bug in the Avro library (AVRO-1891) around > SpecificRecord's handling > // of DateTime types. > SpecificData.get().addLogicalTypeConversion(new TimeConversions. > TimestampConversion()); > GenericData.get().addLogicalTypeConversion(new TimeConversions. > TimestampConversion()); > } > > Because of this when deserializing generic record from kafka using > KafkaAvroDeserializer, the long value produced at the producer end gets > converted to joda-time during deserialization. > > Next when we try to convert this genericRecord to Row as part of > AvroUtils.toBeamRowStrict function, we again try to convert the value > recieved to joda-time. > But the exception is thrown as there is type cast to Long. > > *// Code Snippet Below:* > else if (logicalType instanceof LogicalTypes.TimestampMillis) { > return convertDateTimeStrict((Long) value, fieldType); *<-- > Class cast exception is thrown here, as we are typecasting from JodaTime to > Long* > } > > private static Object convertDateTimeStrict (Long value, Schema.FieldType > fieldType) { > checkTypeName(fieldType.getTypeName(), TypeName.DATETIME, " > dateTime"); > return new Instant(value); <-- *Creates a JodaTime > Instance here* > } > > > *Thanks & Regards,* > > *Vishwas * > > > > On Tue, Apr 16, 2019 at 9:18 AM Rui Wang <ruw...@google.com> wrote: > >> I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to >> Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from >> GenericRecord, and tries to cast objects based on their types (and >> cast(object) to long for "timestamp-millis"). see [1]. >> >> So in order to use `AvroUtils.toBeamRowStrict`, the generated >> GenericRecord should have long for "timestamp-millis". >> >> The schema you pasted looks right. Not sure why generated class is Joda >> time (is it controlled by some flags?). But at least you could write a >> small function to do schema conversion for your need. >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672 >> >> >> Rui >> >> >> On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm <bmvish...@gmail.com> wrote: >> >>> Hi Rui, >>> >>> I agree that by converting it to long, there will be no error. >>> But the KafkaIO is giving a GenericRecord with attribute of type >>> JodaTime. Now I convert it to long. Then in the AvroUtils.toBeamRowStrict >>> again converts it to JodaTime. >>> >>> I used the avro tools 1.8.2 jar, for the below schema and I see that the >>> generated class has a JodaTime attribute. >>> >>> { >>> "name": "timeOfRelease", >>> "type": >>> { >>> "type": "long", >>> "logicalType": "timestamp-millis", >>> "connect.version": 1, >>> "connect.name": >>> "org.apache.kafka.connect.data.Timestamp" >>> } >>> } >>> >>> *Attribute type in generated class:* >>> private org.joda.time.DateTime timeOfRelease; >>> >>> >>> So not sure why this type casting is required. >>> >>> >>> *Thanks & Regards,* >>> >>> *Vishwas * >>> >>> >>> On Tue, Apr 16, 2019 at 12:56 AM Rui Wang <ruw...@google.com> wrote: >>> >>>> Read from the code and seems like as the logical type >>>> "timestamp-millis" means, it's expecting millis in Long as values under >>>> this logical type. >>>> >>>> So if you can convert joda-time to millis before calling >>>> "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception >>>> will gone. >>>> >>>> -Rui >>>> >>>> >>>> On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> +dev <d...@beam.apache.org> >>>>> >>>>> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm <bmvish...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Below is my pipeline: >>>>>> >>>>>> KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql >>>>>> ---------------> KafkaSink(KafkaIO.write) >>>>>> >>>>>> >>>>>> The avro schema of the topic has a field of logical type >>>>>> timestamp-millis. KafkaIO.read transform is creating a >>>>>> KafkaRecord<String,GenericRecord>, where this field is being converted to >>>>>> joda-time. >>>>>> >>>>>> In my Pardo transform, I am trying to use the AvroUtils class methods >>>>>> to convert the generic record to Beam Row and getting below class cast >>>>>> exception for the joda-time attribute. >>>>>> >>>>>> AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema) >>>>>> >>>>>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime >>>>>> cannot be cast to java.lang.Long >>>>>> at >>>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664) >>>>>> at >>>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217) >>>>>> >>>>>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073 >>>>>> for this >>>>>> >>>>>> >>>>>> >>>>>> *Thanks & Regards,* >>>>>> >>>>>> *Vishwas * >>>>>> >>>>>>