PR/8376 is merged and it should be in 2.14.0 release.
-Rui On Mon, Apr 22, 2019 at 10:49 AM Rui Wang <ruw...@google.com> wrote: > I see. I created this PR [1] to ask feedback from the reviewer who knows > better on Avro in Beam. > > -Rui > > > [1]: https://github.com/apache/beam/pull/8376 > > > On Sun, Apr 21, 2019 at 11:19 PM Vishwas Bm <bmvish...@gmail.com> wrote: > >> Hi Rui, >> >> I checked the AvroUtils code. There is a static intializer block >> basically it registers Avro Timestamp Conversion functions for logical type >> timestamp-millis. >> >> *// Code Snippet below* >> static { >> // This works around a bug in the Avro library (AVRO-1891) around >> SpecificRecord's handling >> // of DateTime types. >> SpecificData.get().addLogicalTypeConversion(new TimeConversions. >> TimestampConversion()); >> GenericData.get().addLogicalTypeConversion(new TimeConversions. >> TimestampConversion()); >> } >> >> Because of this when deserializing generic record from kafka using >> KafkaAvroDeserializer, the long value produced at the producer end gets >> converted to joda-time during deserialization. >> >> Next when we try to convert this genericRecord to Row as part of >> AvroUtils.toBeamRowStrict function, we again try to convert the value >> recieved to joda-time. >> But the exception is thrown as there is type cast to Long. >> >> *// Code Snippet Below:* >> else if (logicalType instanceof LogicalTypes.TimestampMillis) { >> return convertDateTimeStrict((Long) value, fieldType); *<-- >> Class cast exception is thrown here, as we are typecasting from JodaTime to >> Long* >> } >> >> private static Object convertDateTimeStrict (Long value, Schema.FieldType >> fieldType) { >> checkTypeName(fieldType.getTypeName(), TypeName.DATETIME, " >> dateTime"); >> return new Instant(value); <-- *Creates a JodaTime >> Instance here* >> } >> >> >> *Thanks & Regards,* >> >> *Vishwas * >> >> >> >> On Tue, Apr 16, 2019 at 9:18 AM Rui Wang <ruw...@google.com> wrote: >> >>> I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to >>> Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from >>> GenericRecord, and tries to cast objects based on their types (and >>> cast(object) to long for "timestamp-millis"). see [1]. >>> >>> So in order to use `AvroUtils.toBeamRowStrict`, the generated >>> GenericRecord should have long for "timestamp-millis". >>> >>> The schema you pasted looks right. Not sure why generated class is Joda >>> time (is it controlled by some flags?). But at least you could write a >>> small function to do schema conversion for your need. >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672 >>> >>> >>> Rui >>> >>> >>> On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm <bmvish...@gmail.com> wrote: >>> >>>> Hi Rui, >>>> >>>> I agree that by converting it to long, there will be no error. >>>> But the KafkaIO is giving a GenericRecord with attribute of type >>>> JodaTime. Now I convert it to long. Then in the AvroUtils.toBeamRowStrict >>>> again converts it to JodaTime. >>>> >>>> I used the avro tools 1.8.2 jar, for the below schema and I see that >>>> the generated class has a JodaTime attribute. >>>> >>>> { >>>> "name": "timeOfRelease", >>>> "type": >>>> { >>>> "type": "long", >>>> "logicalType": "timestamp-millis", >>>> "connect.version": 1, >>>> "connect.name": >>>> "org.apache.kafka.connect.data.Timestamp" >>>> } >>>> } >>>> >>>> *Attribute type in generated class:* >>>> private org.joda.time.DateTime timeOfRelease; >>>> >>>> >>>> So not sure why this type casting is required. >>>> >>>> >>>> *Thanks & Regards,* >>>> >>>> *Vishwas * >>>> >>>> >>>> On Tue, Apr 16, 2019 at 12:56 AM Rui Wang <ruw...@google.com> wrote: >>>> >>>>> Read from the code and seems like as the logical type >>>>> "timestamp-millis" means, it's expecting millis in Long as values under >>>>> this logical type. >>>>> >>>>> So if you can convert joda-time to millis before calling >>>>> "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your >>>>> exception >>>>> will gone. >>>>> >>>>> -Rui >>>>> >>>>> >>>>> On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> +dev <d...@beam.apache.org> >>>>>> >>>>>> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm <bmvish...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Below is my pipeline: >>>>>>> >>>>>>> KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql >>>>>>> ---------------> KafkaSink(KafkaIO.write) >>>>>>> >>>>>>> >>>>>>> The avro schema of the topic has a field of logical type >>>>>>> timestamp-millis. KafkaIO.read transform is creating a >>>>>>> KafkaRecord<String,GenericRecord>, where this field is being converted >>>>>>> to >>>>>>> joda-time. >>>>>>> >>>>>>> In my Pardo transform, I am trying to use the AvroUtils class >>>>>>> methods to convert the generic record to Beam Row and getting below >>>>>>> class >>>>>>> cast exception for the joda-time attribute. >>>>>>> >>>>>>> AvroUtils.toBeamRowStrict(genericRecord, >>>>>>> this.beamSchema) >>>>>>> >>>>>>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime >>>>>>> cannot be cast to java.lang.Long >>>>>>> at >>>>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664) >>>>>>> at >>>>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217) >>>>>>> >>>>>>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073 >>>>>>> for this >>>>>>> >>>>>>> >>>>>>> >>>>>>> *Thanks & Regards,* >>>>>>> >>>>>>> *Vishwas * >>>>>>> >>>>>>>