Hi Lorenzo, 1) I'm surprised that this doesn't work. I'd like to see that stacktrace.
2) cannot work like this, because we bundle Avro 1.8.2. You could retest with dateTimeLogicalType='Joda' set, but then you will probably see the same issue as 1) 3) I'm surprised that this doesn't work either. There is a codepath since 2016 for GenericRecord and it's covered in a test. From the error description and the ticket, it looks like the issue is not the AvroInputFormat, but the serializer. So it would probably work with a different serializer (but that would cause back and forth type transformation). On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora <lorenzo.nic...@gmail.com> wrote: > Thanks Timo, > > the stacktrace with 1.9.2-generated specific file is the following > > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) > Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type > java.time.Instant: 2020-06-01T02:00:42.105Z > at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) > at > org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) > at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) > at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) > at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) > at > org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) > ... 7 more > > > I reckon logical types might have been considered somehow experimental > since...ever. But, honestly, I've been using them in the Kafka/Java > ecosystem as well as in Spark without too many problems. > > For my specific use case, the schema is given. Messages are produced by a > 3rd party and we cannot change the schema (especially because it's a legit > schema). > I am desperately looking for a workaround. > > I had a similar issue with a Kafka Source, and AVRO records containing > decimals and timestamps. Timestamps worked but not decimals. > I was able to work around the problem using GenericRecords. > But Kafka source relies on AvroDeserializationSchema rather than > AvroSerializer, and has no problem handling GenericRecords. > > I'm honestly finding very confusing having different ways of handling AVRO > deserialization inside Flink core components. > > Cheers > Lorenzo > > > On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> wrote: > >> Hi Lorenzo, >> >> as far as I know we don't support Avro's logical times in Flink's >> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports the >> 1.8.2 version of logical types but might be incompatible with 1.9.2. >> >> Reg 2) Specific record generated with AVRO 1.9.2 plugin: >> >> Could you send us the full stack trace? I think this should actually >> work, because specific records are handled as POJOs and those should be >> able to also deal with logical type's classes through Kryo. >> >> Reg 3) Generic record >> >> It would be great if we can make this option possible. We could include >> it in the next minor release fix. >> >> Sorry, for the bad user experience. But IMHO logical type are still >> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest >> shortcomings such that Flink can properly support them as well. >> >> Regards, >> Timo >> >> [1] >> >> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java >> >> >> On 10.06.20 15:08, Lorenzo Nicora wrote: >> > Hi, >> > >> > I need to continuously ingest AVRO files as they arrive. >> > Files are written by an S3 Sink Kafka Connect but S3 is not the point >> > here. I started trying to ingest a static bunch of files from local fs >> > first and I am having weird issues with AVRO deserialization. >> > >> > I have to say, the records contain logical types, timestamps-ms and >> decimals >> > >> > To keep it simple, I extracted the AVRO schema from the data files and >> > used avro-maven-plugin to generate POJOs >> > I tried multiple combinations, all with no luck >> > >> > 1) Specific record generated with AVRO 1.8.2 plugin >> > >> > Path in = new Path(sourceBasePath); >> > AvroInputFormat<AccountEntries> inputFormat = new AvroInputFormat<>(in, >> > AccountEntries.class); >> > DataStream<AccountEntries> accountEntries = env >> > .readFile(inputFormat, sourceBasePath, >> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS); >> > >> > *Result* >> > java.lang.ClassCastException: java.lang.Long cannot be cast to >> > org.joda.time.DateTime >> > (IIRC this is a known AVRO 1.8.2 issue) >> > >> > >> > 2) Specific record generated with AVRO 1.9.2 plugin >> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2 >> > >> > *Result* >> > org.apache.avro.AvroRuntimeException: Unknown datum type >> java.time.Instant >> > >> > >> > 3) Generic record >> > I am getting the Schema from the generated specific record, for >> > convenience, but I am not using the generated POJO as record. >> > I also followed the suggestions in this Flink blog post >> > < >> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>, >> >> > to explicitly specify the TypeInfo with returns(...) >> > >> > Path in = new Path(config.sourceFileSystemPath); >> > Schema schema = AccountEntries.getClassSchema(); >> > AvroInputFormat<GenericRecord> inputFormat = new AvroInputFormat<>(in, >> > GenericRecord.class); >> > DataStream<GenericRecord> accountEntries = env >> > .readFile(inputFormat, config.sourceFileSystemPath, >> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS) >> > .returns(new GenericRecordAvroTypeInfo(schema)); >> > >> > >> > *Result* >> > The class 'org.apache.avro.generic.GenericRecord' is not instantiable: >> > The class is not a proper class. It is either abstract, an interface, >> or >> > a primitive type. >> > >> > This looks like a bug. >> > I raised the ticket <https://issues.apache.org/jira/browse/FLINK-18223> >> >> > and I will try to submit a fix, but still do not solve my problem as I >> > am using a managed Flink I cannot update. >> > I cannot believe there is no workaround. I do not think I'm trying to >> do >> > anything bizarre. Am I? >> > >> > Any ideas? >> > Am I missing something obvious? >> > >> > Cheers >> > Lorenzo >> >> -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng