Hi,

I need to continuously ingest AVRO files as they arrive.
Files are written by an S3 Sink Kafka Connect but S3 is not the point here.
I started trying to ingest a static bunch of files from local fs first and
I am having weird issues with AVRO deserialization.

I have to say, the records contain logical types, timestamps-ms and decimals

To keep it simple, I extracted the AVRO schema from the data files and used
avro-maven-plugin to generate POJOs
I tried multiple combinations, all with no luck

1) Specific record generated with AVRO 1.8.2 plugin

Path in = new Path(sourceBasePath);
AvroInputFormat<AccountEntries> inputFormat = new
AvroInputFormat<>(in, AccountEntries.class);
DataStream<AccountEntries> accountEntries = env
        .readFile(inputFormat, sourceBasePath,
FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS);

*Result*
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
(IIRC this is a known AVRO 1.8.2 issue)


2) Specific record generated with AVRO 1.9.2 plugin
Same code as above but AVRO POJOs are generated with AVRO 1.9.2

*Result*
org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant


3) Generic record
I am getting the Schema from the generated specific record, for
convenience, but I am not using the generated POJO as record.
I also followed the suggestions in this Flink blog post
<https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>,
to explicitly specify the TypeInfo with returns(...)

Path in = new Path(config.sourceFileSystemPath);
Schema schema = AccountEntries.getClassSchema();
AvroInputFormat<GenericRecord> inputFormat = new AvroInputFormat<>(in,
GenericRecord.class);
DataStream<GenericRecord> accountEntries = env
         .readFile(inputFormat, config.sourceFileSystemPath,
FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS)
        .returns(new GenericRecordAvroTypeInfo(schema));


*Result*
The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The
class is not a proper class. It is either abstract, an interface, or a
primitive type.

This looks like a bug.
I raised the ticket <https://issues.apache.org/jira/browse/FLINK-18223> and
I will try to submit a fix, but still do not solve my problem as I am using
a managed Flink I cannot update.
I cannot believe there is no workaround. I do not think I'm trying to do
anything bizarre. Am I?

Any ideas?
Am I missing something obvious?

Cheers
Lorenzo

Reply via email to