Hi Arvid, I confirm in the case 3) the problem is AvroSerializer.
How can I use a different serializer with AvroFileFormat? I would be happy to make the file ingestion working and immediately after mapping to an hand-written POJO, to avoid any inefficiency or headache with moving around GenericRecords, if this is what you mean with back and forth type transformation Cheers Lorenzo On Wed, 10 Jun 2020, 17:52 Arvid Heise, <ar...@ververica.com> wrote: > Hi Lorenzo, > > 1) I'm surprised that this doesn't work. I'd like to see that stacktrace. > > 2) cannot work like this, because we bundle Avro 1.8.2. You could retest > with dateTimeLogicalType='Joda' set, but then you will probably see the > same issue as 1) > > 3) I'm surprised that this doesn't work either. There is a codepath since > 2016 for GenericRecord and it's covered in a test. From the error > description and the ticket, it looks like the issue is not the > AvroInputFormat, but the serializer. So it would probably work with a > different serializer (but that would cause back and forth type > transformation). > > On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora <lorenzo.nic...@gmail.com> > wrote: > >> Thanks Timo, >> >> the stacktrace with 1.9.2-generated specific file is the following >> >> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >> Could not forward element to next operator >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >> at >> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type >> java.time.Instant: 2020-06-01T02:00:42.105Z >> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >> at >> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >> at >> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >> ... 7 more >> >> >> I reckon logical types might have been considered somehow experimental >> since...ever. But, honestly, I've been using them in the Kafka/Java >> ecosystem as well as in Spark without too many problems. >> >> For my specific use case, the schema is given. Messages are produced by a >> 3rd party and we cannot change the schema (especially because it's a legit >> schema). >> I am desperately looking for a workaround. >> >> I had a similar issue with a Kafka Source, and AVRO records containing >> decimals and timestamps. Timestamps worked but not decimals. >> I was able to work around the problem using GenericRecords. >> But Kafka source relies on AvroDeserializationSchema rather than >> AvroSerializer, and has no problem handling GenericRecords. >> >> I'm honestly finding very confusing having different ways of handling >> AVRO deserialization inside Flink core components. >> >> Cheers >> Lorenzo >> >> >> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> wrote: >> >>> Hi Lorenzo, >>> >>> as far as I know we don't support Avro's logical times in Flink's >>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports the >>> 1.8.2 version of logical types but might be incompatible with 1.9.2. >>> >>> Reg 2) Specific record generated with AVRO 1.9.2 plugin: >>> >>> Could you send us the full stack trace? I think this should actually >>> work, because specific records are handled as POJOs and those should be >>> able to also deal with logical type's classes through Kryo. >>> >>> Reg 3) Generic record >>> >>> It would be great if we can make this option possible. We could include >>> it in the next minor release fix. >>> >>> Sorry, for the bad user experience. But IMHO logical type are still >>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest >>> shortcomings such that Flink can properly support them as well. >>> >>> Regards, >>> Timo >>> >>> [1] >>> >>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java >>> >>> >>> On 10.06.20 15:08, Lorenzo Nicora wrote: >>> > Hi, >>> > >>> > I need to continuously ingest AVRO files as they arrive. >>> > Files are written by an S3 Sink Kafka Connect but S3 is not the point >>> > here. I started trying to ingest a static bunch of files from local fs >>> > first and I am having weird issues with AVRO deserialization. >>> > >>> > I have to say, the records contain logical types, timestamps-ms and >>> decimals >>> > >>> > To keep it simple, I extracted the AVRO schema from the data files and >>> > used avro-maven-plugin to generate POJOs >>> > I tried multiple combinations, all with no luck >>> > >>> > 1) Specific record generated with AVRO 1.8.2 plugin >>> > >>> > Path in = new Path(sourceBasePath); >>> > AvroInputFormat<AccountEntries> inputFormat = new >>> AvroInputFormat<>(in, >>> > AccountEntries.class); >>> > DataStream<AccountEntries> accountEntries = env >>> > .readFile(inputFormat, sourceBasePath, >>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS); >>> > >>> > *Result* >>> > java.lang.ClassCastException: java.lang.Long cannot be cast to >>> > org.joda.time.DateTime >>> > (IIRC this is a known AVRO 1.8.2 issue) >>> > >>> > >>> > 2) Specific record generated with AVRO 1.9.2 plugin >>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2 >>> > >>> > *Result* >>> > org.apache.avro.AvroRuntimeException: Unknown datum type >>> java.time.Instant >>> > >>> > >>> > 3) Generic record >>> > I am getting the Schema from the generated specific record, for >>> > convenience, but I am not using the generated POJO as record. >>> > I also followed the suggestions in this Flink blog post >>> > < >>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>, >>> >>> > to explicitly specify the TypeInfo with returns(...) >>> > >>> > Path in = new Path(config.sourceFileSystemPath); >>> > Schema schema = AccountEntries.getClassSchema(); >>> > AvroInputFormat<GenericRecord> inputFormat = new AvroInputFormat<>(in, >>> > GenericRecord.class); >>> > DataStream<GenericRecord> accountEntries = env >>> > .readFile(inputFormat, config.sourceFileSystemPath, >>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS) >>> > .returns(new GenericRecordAvroTypeInfo(schema)); >>> > >>> > >>> > *Result* >>> > The class 'org.apache.avro.generic.GenericRecord' is not instantiable: >>> > The class is not a proper class. It is either abstract, an interface, >>> or >>> > a primitive type. >>> > >>> > This looks like a bug. >>> > I raised the ticket <https://issues.apache.org/jira/browse/FLINK-18223> >>> >>> > and I will try to submit a fix, but still do not solve my problem as I >>> > am using a managed Flink I cannot update. >>> > I cannot believe there is no workaround. I do not think I'm trying to >>> do >>> > anything bizarre. Am I? >>> > >>> > Any ideas? >>> > Am I missing something obvious? >>> > >>> > Cheers >>> > Lorenzo >>> >>> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >