PR/8376 is merged and it should be in 2.14.0 release.

-Rui

On Mon, Apr 22, 2019 at 10:49 AM Rui Wang <ruw...@google.com> wrote:

> I see. I created this PR [1] to ask feedback from the reviewer who knows
> better on Avro in Beam.
>
> -Rui
>
>
> [1]: https://github.com/apache/beam/pull/8376
>
>
> On Sun, Apr 21, 2019 at 11:19 PM Vishwas Bm <bmvish...@gmail.com> wrote:
>
>> Hi Rui,
>>
>> I checked the AvroUtils code. There is a static intializer block
>> basically it registers Avro Timestamp Conversion functions for logical type
>> timestamp-millis.
>>
>> *// Code Snippet below*
>> static {
>> // This works around a bug in the Avro library (AVRO-1891) around
>> SpecificRecord's handling
>> // of DateTime types.
>>    SpecificData.get().addLogicalTypeConversion(new TimeConversions.
>> TimestampConversion());
>>    GenericData.get().addLogicalTypeConversion(new TimeConversions.
>> TimestampConversion());
>> }
>>
>> Because of this when deserializing generic record from kafka using
>> KafkaAvroDeserializer, the long value produced at the producer end gets
>> converted to joda-time during deserialization.
>>
>> Next when we try to convert this genericRecord to Row as part of
>> AvroUtils.toBeamRowStrict function, we again try to convert the value
>> recieved to joda-time.
>> But the exception is thrown as there is type cast to Long.
>>
>> *// Code Snippet Below:*
>> else if (logicalType instanceof LogicalTypes.TimestampMillis) {
>>      return convertDateTimeStrict((Long) value, fieldType);     *<--
>> Class cast exception is thrown here, as we are typecasting from JodaTime to
>> Long*
>> }
>>
>> private static Object convertDateTimeStrict (Long value, Schema.FieldType
>> fieldType) {
>>          checkTypeName(fieldType.getTypeName(), TypeName.DATETIME, "
>> dateTime");
>>          return new Instant(value);      <--  *Creates a JodaTime
>> Instance here*
>> }
>>
>>
>> *Thanks & Regards,*
>>
>> *Vishwas *
>>
>>
>>
>> On Tue, Apr 16, 2019 at 9:18 AM Rui Wang <ruw...@google.com> wrote:
>>
>>> I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to
>>> Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from
>>> GenericRecord, and tries to cast objects based on their types (and
>>> cast(object) to long for "timestamp-millis"). see [1].
>>>
>>> So in order to use `AvroUtils.toBeamRowStrict`, the generated
>>> GenericRecord should have long for "timestamp-millis".
>>>
>>> The schema you pasted looks right. Not sure why generated class is Joda
>>> time (is it controlled by some flags?). But at least you could write a
>>> small function to do schema conversion for your need.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672
>>>
>>>
>>> Rui
>>>
>>>
>>> On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm <bmvish...@gmail.com> wrote:
>>>
>>>> Hi Rui,
>>>>
>>>> I agree that by converting it to long, there will be no error.
>>>> But the KafkaIO is giving a GenericRecord with attribute of type
>>>> JodaTime. Now I convert it to long. Then in the  AvroUtils.toBeamRowStrict
>>>> again converts it to JodaTime.
>>>>
>>>> I used the avro tools 1.8.2 jar, for the below schema and I see that
>>>> the generated class has a JodaTime attribute.
>>>>
>>>> {
>>>>             "name": "timeOfRelease",
>>>>             "type":
>>>>                 {
>>>>                     "type": "long",
>>>>                     "logicalType": "timestamp-millis",
>>>>                     "connect.version": 1,
>>>>                     "connect.name":
>>>> "org.apache.kafka.connect.data.Timestamp"
>>>>                 }
>>>>  }
>>>>
>>>> *Attribute type in generated class:*
>>>> private org.joda.time.DateTime timeOfRelease;
>>>>
>>>>
>>>> So not sure why this type casting is required.
>>>>
>>>>
>>>> *Thanks & Regards,*
>>>>
>>>> *Vishwas *
>>>>
>>>>
>>>> On Tue, Apr 16, 2019 at 12:56 AM Rui Wang <ruw...@google.com> wrote:
>>>>
>>>>> Read from the code and seems like as the logical type
>>>>> "timestamp-millis" means, it's expecting millis in Long as values under
>>>>> this logical type.
>>>>>
>>>>> So if you can convert joda-time to millis before calling
>>>>> "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your 
>>>>> exception
>>>>> will gone.
>>>>>
>>>>> -Rui
>>>>>
>>>>>
>>>>> On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> +dev <d...@beam.apache.org>
>>>>>>
>>>>>> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm <bmvish...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Below is my pipeline:
>>>>>>>
>>>>>>> KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql
>>>>>>> ---------------> KafkaSink(KafkaIO.write)
>>>>>>>
>>>>>>>
>>>>>>> The avro schema of the topic has a field of logical type
>>>>>>> timestamp-millis.  KafkaIO.read transform is creating a
>>>>>>> KafkaRecord<String,GenericRecord>, where this field is being converted 
>>>>>>> to
>>>>>>> joda-time.
>>>>>>>
>>>>>>> In my Pardo transform, I am trying to use the AvroUtils class
>>>>>>> methods to convert the generic record to Beam Row and getting below 
>>>>>>> class
>>>>>>> cast exception for the joda-time attribute.
>>>>>>>
>>>>>>>              AvroUtils.toBeamRowStrict(genericRecord,
>>>>>>> this.beamSchema)
>>>>>>>
>>>>>>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime
>>>>>>> cannot be cast to java.lang.Long
>>>>>>>     at
>>>>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
>>>>>>>     at
>>>>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
>>>>>>>
>>>>>>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073
>>>>>>> for this
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Thanks & Regards,*
>>>>>>>
>>>>>>> *Vishwas *
>>>>>>>
>>>>>>>

Reply via email to