Hi, I need to continuously ingest AVRO files as they arrive. Files are written by an S3 Sink Kafka Connect but S3 is not the point here. I started trying to ingest a static bunch of files from local fs first and I am having weird issues with AVRO deserialization.
I have to say, the records contain logical types, timestamps-ms and decimals To keep it simple, I extracted the AVRO schema from the data files and used avro-maven-plugin to generate POJOs I tried multiple combinations, all with no luck 1) Specific record generated with AVRO 1.8.2 plugin Path in = new Path(sourceBasePath); AvroInputFormat<AccountEntries> inputFormat = new AvroInputFormat<>(in, AccountEntries.class); DataStream<AccountEntries> accountEntries = env .readFile(inputFormat, sourceBasePath, FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS); *Result* java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime (IIRC this is a known AVRO 1.8.2 issue) 2) Specific record generated with AVRO 1.9.2 plugin Same code as above but AVRO POJOs are generated with AVRO 1.9.2 *Result* org.apache.avro.AvroRuntimeException: Unknown datum type java.time.Instant 3) Generic record I am getting the Schema from the generated specific record, for convenience, but I am not using the generated POJO as record. I also followed the suggestions in this Flink blog post <https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>, to explicitly specify the TypeInfo with returns(...) Path in = new Path(config.sourceFileSystemPath); Schema schema = AccountEntries.getClassSchema(); AvroInputFormat<GenericRecord> inputFormat = new AvroInputFormat<>(in, GenericRecord.class); DataStream<GenericRecord> accountEntries = env .readFile(inputFormat, config.sourceFileSystemPath, FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS) .returns(new GenericRecordAvroTypeInfo(schema)); *Result* The class 'org.apache.avro.generic.GenericRecord' is not instantiable: The class is not a proper class. It is either abstract, an interface, or a primitive type. This looks like a bug. I raised the ticket <https://issues.apache.org/jira/browse/FLINK-18223> and I will try to submit a fix, but still do not solve my problem as I am using a managed Flink I cannot update. I cannot believe there is no workaround. I do not think I'm trying to do anything bizarre. Am I? Any ideas? Am I missing something obvious? Cheers Lorenzo