Hi!
@Arvid: We are using Avro 1.8 I believe but this problem seems to come from
the flink side as Dawid mentioned.
@Dawid:
Sounds like a reasonable explanation, here are the actual queries to
reproduce within the SQL client/table api:
CREATE TABLE source_table (
int_field INT,
timestamp_field TIMESTAMP(3)
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'avro_tset',
'connector.properties.bootstrap.servers' = '<...>',
'format.type' = 'avro',
'format.avro-schema' =
'{
"type": "record",
"name": "test",
"fields" : [
{"name": "int_field", "type": "int"},
{"name": "timestamp_field", "type": {"type":"long",
"logicalType": "timestamp-millis"}}
]
}'
)
INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
And the error:
Caused by: java.lang.ClassCastException: java.time.LocalDateTime
cannot be cast to java.lang.Long
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
at
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at
org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)
I will open a Jira ticket as well with these details.
Thank you!
Gyula
On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz <[email protected]>
wrote:
> Hi Gyula,
>
> I have not verified it locally yet, but I think you are hitting yet
> another problem of the unfinished migration from old TypeInformation based
> type system to the new type system based on DataTypes. As far as I
> understand the problem the information about the bridging class
> (java.sql.Timestamp in this case) is lost in the stack. Because this
> information is lost/not respected the planner produces LocalDateTime
> instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema
> expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails
> for LocalDateTime. I really hope the effort of FLIP-95 will significantly
> reduce the number of problems.
>
> It's definitely worth reporting a bug.
>
> BTW could you share how you create the Kafka Table sink to have the full
> picture?
>
> Best,
>
> Dawid
> On 29/04/2020 15:42, Gyula Fóra wrote:
>
> Hi All!
>
> We are trying to work with avro serialized data from Kafka using the Table
> API and use TIMESTAMP column type.
>
> According to the docs
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>,
> we can use long type with logicalType: timestamp-millis.
> So we use the following avro field schema in the descriptor:
>
>
> {"name": "timestamp_field", "type": {"type":"long", "logicalType":
> "timestamp-millis"}}
>
> When trying to insert into the table we get the following error:
>
> Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot
> be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long
> are in module java.base of loader 'bootstrap') at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>
> It seems like the avro format (serializer) is not aware of the logical type
> conversion that is needed to convert back to the physical type long.
>
> I looked at the AvroTypesITCase which uses all kinds of logical types but I
> could only find logic that maps between Avro Pojos and tables and none that
> actually uses the serializaiton/deserialization logic with the format.
>
> Could someone please help me with this? Maybe what I am trying to do is not
> possible, or I just missed a crucial step.
>
> Thank you!
> Gyula
>
>
>
>