I didn't find code in `AvroUtils.toBeamRowStrict` that converts long to
Joda time. `AvroUtils.toBeamRowStrict` retrieves objects from
GenericRecord, and tries to cast objects based on their types (and
cast(object) to long for "timestamp-millis"). see [1].

So in order to use `AvroUtils.toBeamRowStrict`, the generated GenericRecord
should have long for "timestamp-millis".

The schema you pasted looks right. Not sure why generated class is Joda
time (is it controlled by some flags?). But at least you could write a
small function to do schema conversion for your need.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java#L672


Rui


On Mon, Apr 15, 2019 at 7:11 PM Vishwas Bm <bmvish...@gmail.com> wrote:

> Hi Rui,
>
> I agree that by converting it to long, there will be no error.
> But the KafkaIO is giving a GenericRecord with attribute of type JodaTime.
> Now I convert it to long. Then in the  AvroUtils.toBeamRowStrict again
> converts it to JodaTime.
>
> I used the avro tools 1.8.2 jar, for the below schema and I see that the
> generated class has a JodaTime attribute.
>
> {
>             "name": "timeOfRelease",
>             "type":
>                 {
>                     "type": "long",
>                     "logicalType": "timestamp-millis",
>                     "connect.version": 1,
>                     "connect.name":
> "org.apache.kafka.connect.data.Timestamp"
>                 }
>  }
>
> *Attribute type in generated class:*
> private org.joda.time.DateTime timeOfRelease;
>
>
> So not sure why this type casting is required.
>
>
> *Thanks & Regards,*
>
> *Vishwas *
>
>
> On Tue, Apr 16, 2019 at 12:56 AM Rui Wang <ruw...@google.com> wrote:
>
>> Read from the code and seems like as the logical type "timestamp-millis"
>> means, it's expecting millis in Long as values under this logical type.
>>
>> So if you can convert joda-time to millis before calling
>> "AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)", your exception
>> will gone.
>>
>> -Rui
>>
>>
>> On Mon, Apr 15, 2019 at 10:28 AM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> +dev <d...@beam.apache.org>
>>>
>>> On Sun, Apr 14, 2019 at 10:29 PM Vishwas Bm <bmvish...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Below is my pipeline:
>>>>
>>>> KafkaSource (KafkaIO.read) ------> Pardo ---------------> BeamSql
>>>> ---------------> KafkaSink(KafkaIO.write)
>>>>
>>>>
>>>> The avro schema of the topic has a field of logical type
>>>> timestamp-millis.  KafkaIO.read transform is creating a
>>>> KafkaRecord<String,GenericRecord>, where this field is being converted to
>>>> joda-time.
>>>>
>>>> In my Pardo transform, I am trying to use the AvroUtils class methods
>>>> to convert the generic record to Beam Row and getting below class cast
>>>> exception for the joda-time attribute.
>>>>
>>>>              AvroUtils.toBeamRowStrict(genericRecord, this.beamSchema)
>>>>
>>>> Caused by: java.lang.ClassCastException: org.joda.time.DateTime cannot
>>>> be cast to java.lang.Long
>>>>     at
>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.convertAvroFieldStrict(AvroUtils.java:664)
>>>>     at
>>>> org.apache.beam.sdk.schemas.utils.AvroUtils.toBeamRowStrict(AvroUtils.java:217)
>>>>
>>>> I have opened a jira https://issues.apache.org/jira/browse/BEAM-7073
>>>> for this
>>>>
>>>>
>>>>
>>>> *Thanks & Regards,*
>>>>
>>>> *Vishwas *
>>>>
>>>>

Reply via email to