I see. I created this PR [1] to ask feedback from the reviewer who knows
better on Avro in Beam.
-Rui


[1]: https://github.com/apache/beam/pull/8376


On Sun, Apr 21, 2019 at 11:19 PM Vishwas Bm <bmvish...@gmail.com> wrote:

> Hi Rui,
>
> I checked the AvroUtils code. There is a static intializer block basically
> it registers Avro Timestamp Conversion functions for logical type
> timestamp-millis.
>
> *// Code Snippet below*
> static {
> // This works around a bug in the Avro library (AVRO-1891) around
> SpecificRecord's handling
> // of DateTime types.
>    SpecificData.get().addLogicalTypeConversion(new TimeConversions.
> TimestampConversion());
>    GenericData.get().addLogicalTypeConversion(new TimeConversions.
> TimestampConversion());
> }
>
> Because of this when deserializing generic record from kafka using
> KafkaAvroDeserializer, the long value produced at the producer end gets
> converted to joda-time during deserialization.
>
> Next when we try to convert this genericRecord to Row as part of
> AvroUtils.toBeamRowStrict function, we again try to convert the value
> recieved to joda-time.
> But the exception is thrown as there is type cast to Long.
>
> *// Code Snippet Below:*
> else if (logicalType instanceof LogicalTypes.TimestampMillis) {
>      return convertDateTimeStrict((Long) value, fieldType);     *<--
> Class cast exception is thrown here, as we are typecasting from JodaTime to
> Long*
> }
>
> private static Object convertDateTimeStrict (Long value, Schema.FieldType
> fieldType) {
>          checkTypeName(fieldType.getTypeName(), TypeName.DATETIME, "
> dateTime");
>          return new Instant(value);      <--  *Creates a JodaTime
> Instance here*
> }
>
>
> *Thanks & Regards,*
>
> *Vishwas *
>
>
>
> On Tue, Apr 16, 2019 at 9:18 AM Rui Wang <ruw...@google.com> wrote:
>
>> I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to
>> Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from
>> GenericRecord, and tries to cast objects based on their types (and
>> cast(object) to long for "timestamp-millis"). see [1].
>>
>> So in order to use `AvroUtils.toBeamRowStrict`, the generated
>> GenericRecord should have long for "timestamp-millis".
>>
>> The schema you pasted looks right. Not sure why generated class is Joda
>> time (is it controlled by some flags?). But at least you could write a
>> small function to do schema conversion for your need.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672
>>
>>
>> Rui
>>
>>
>> On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm <bmvish...@gmail.com> wrote:
>>
>>> Hi Rui,
>>>
>>> I agree that by converting it to long, there will be no error.
>>> But the KafkaIO is giving a GenericRecord with attribute of type
>>> JodaTime. Now I convert it to long. Then in the  AvroUtils.toBeamRowStrict
>>> again converts it to JodaTime.
>>>
>>> I used the avro tools 1.8.2 jar, for the below schema and I see that the
>>> generated class has a JodaTime attribute.
>>>
>>> {
>>>             "name": "timeOfRelease",
>>>             "type":
>>>                 {
>>>                     "type": "long",
>>>                     "logicalType": "timestamp-millis",
>>>                     "connect.version": 1,
>>>                     "connect.name":
>>> "org.apache.kafka.connect.data.Timestamp"
>>>                 }
>>>  }
>>>
>>> *Attribute type in generated class:*
>>> private org.joda.time.DateTime timeOfRelease;
>>>
>>>
>>> So not sure why this type casting is required.
>>>
>>>
>>> *Thanks & Regards,*
>>>
>>> *Vishwas *
>>>
>>>
>>> On Tue, Apr 16, 2019 at 12:56 AM Rui Wang <ruw...@google.com> wrote:
>>>
>>>> Read from the code and seems like as the logical type
>>>> "timestamp-millis" means, it's expecting millis in Long as values under
>>>> this logical type.
>>>>
>>>> So if you can convert joda-time to millis before calling
>>>> "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception
>>>> will gone.
>>>>
>>>> -Rui
>>>>
>>>>
>>>> On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> +dev <d...@beam.apache.org>
>>>>>
>>>>> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm <bmvish...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Below is my pipeline:
>>>>>>
>>>>>> KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql
>>>>>> ---------------> KafkaSink(KafkaIO.write)
>>>>>>
>>>>>>
>>>>>> The avro schema of the topic has a field of logical type
>>>>>> timestamp-millis.  KafkaIO.read transform is creating a
>>>>>> KafkaRecord<String,GenericRecord>, where this field is being converted to
>>>>>> joda-time.
>>>>>>
>>>>>> In my Pardo transform, I am trying to use the AvroUtils class methods
>>>>>> to convert the generic record to Beam Row and getting below class cast
>>>>>> exception for the joda-time attribute.
>>>>>>
>>>>>>              AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)
>>>>>>
>>>>>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime
>>>>>> cannot be cast to java.lang.Long
>>>>>>     at
>>>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
>>>>>>     at
>>>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
>>>>>>
>>>>>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073
>>>>>> for this
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Thanks & Regards,*
>>>>>>
>>>>>> *Vishwas *
>>>>>>
>>>>>>

Reply via email to