Hi Lorenzo, Looking at the stack trace, the issue is that copying a record uses the serializer directly. So, you need to enableObjectReuse() [1] to avoid that. Make sure that you are not modifying/caching data after emitting it in your pipeline (except Flink managed state).
Then, it should be possible to directly use a map after the source to convert it into POJO and I'd assume that a different serializer will be picked. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html On Wed, Jun 10, 2020 at 11:13 PM Lorenzo Nicora <lorenzo.nic...@gmail.com> wrote: > Hi Arvid, > > I confirm in the case 3) the problem is AvroSerializer. > > How can I use a different serializer with AvroFileFormat? > > I would be happy to make the file ingestion working and immediately after > mapping to an hand-written POJO, to avoid any inefficiency or headache with > moving around GenericRecords, if this is what you mean with back and forth > type transformation > > Cheers > Lorenzo > > On Wed, 10 Jun 2020, 17:52 Arvid Heise, <ar...@ververica.com> wrote: > >> Hi Lorenzo, >> >> 1) I'm surprised that this doesn't work. I'd like to see that stacktrace. >> >> 2) cannot work like this, because we bundle Avro 1.8.2. You could retest >> with dateTimeLogicalType='Joda' set, but then you will probably see the >> same issue as 1) >> >> 3) I'm surprised that this doesn't work either. There is a codepath since >> 2016 for GenericRecord and it's covered in a test. From the error >> description and the ticket, it looks like the issue is not the >> AvroInputFormat, but the serializer. So it would probably work with a >> different serializer (but that would cause back and forth type >> transformation). >> >> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora <lorenzo.nic...@gmail.com> >> wrote: >> >>> Thanks Timo, >>> >>> the stacktrace with 1.9.2-generated specific file is the following >>> >>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>> Could not forward element to next operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >>> at >>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type >>> java.time.Instant: 2020-06-01T02:00:42.105Z >>> at >>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >>> at >>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>> at >>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >>> ... 7 more >>> >>> >>> I reckon logical types might have been considered somehow experimental >>> since...ever. But, honestly, I've been using them in the Kafka/Java >>> ecosystem as well as in Spark without too many problems. >>> >>> For my specific use case, the schema is given. Messages are produced by >>> a 3rd party and we cannot change the schema (especially because it's a >>> legit schema). >>> I am desperately looking for a workaround. >>> >>> I had a similar issue with a Kafka Source, and AVRO records containing >>> decimals and timestamps. Timestamps worked but not decimals. >>> I was able to work around the problem using GenericRecords. >>> But Kafka source relies on AvroDeserializationSchema rather than >>> AvroSerializer, and has no problem handling GenericRecords. >>> >>> I'm honestly finding very confusing having different ways of handling >>> AVRO deserialization inside Flink core components. >>> >>> Cheers >>> Lorenzo >>> >>> >>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> wrote: >>> >>>> Hi Lorenzo, >>>> >>>> as far as I know we don't support Avro's logical times in Flink's >>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports the >>>> 1.8.2 version of logical types but might be incompatible with 1.9.2. >>>> >>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin: >>>> >>>> Could you send us the full stack trace? I think this should actually >>>> work, because specific records are handled as POJOs and those should be >>>> able to also deal with logical type's classes through Kryo. >>>> >>>> Reg 3) Generic record >>>> >>>> It would be great if we can make this option possible. We could include >>>> it in the next minor release fix. >>>> >>>> Sorry, for the bad user experience. But IMHO logical type are still >>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest >>>> shortcomings such that Flink can properly support them as well. >>>> >>>> Regards, >>>> Timo >>>> >>>> [1] >>>> >>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java >>>> >>>> >>>> On 10.06.20 15:08, Lorenzo Nicora wrote: >>>> > Hi, >>>> > >>>> > I need to continuously ingest AVRO files as they arrive. >>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the point >>>> > here. I started trying to ingest a static bunch of files from local >>>> fs >>>> > first and I am having weird issues with AVRO deserialization. >>>> > >>>> > I have to say, the records contain logical types, timestamps-ms and >>>> decimals >>>> > >>>> > To keep it simple, I extracted the AVRO schema from the data files >>>> and >>>> > used avro-maven-plugin to generate POJOs >>>> > I tried multiple combinations, all with no luck >>>> > >>>> > 1) Specific record generated with AVRO 1.8.2 plugin >>>> > >>>> > Path in = new Path(sourceBasePath); >>>> > AvroInputFormat<AccountEntries> inputFormat = new >>>> AvroInputFormat<>(in, >>>> > AccountEntries.class); >>>> > DataStream<AccountEntries> accountEntries = env >>>> > .readFile(inputFormat, sourceBasePath, >>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS); >>>> > >>>> > *Result* >>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to >>>> > org.joda.time.DateTime >>>> > (IIRC this is a known AVRO 1.8.2 issue) >>>> > >>>> > >>>> > 2) Specific record generated with AVRO 1.9.2 plugin >>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2 >>>> > >>>> > *Result* >>>> > org.apache.avro.AvroRuntimeException: Unknown datum type >>>> java.time.Instant >>>> > >>>> > >>>> > 3) Generic record >>>> > I am getting the Schema from the generated specific record, for >>>> > convenience, but I am not using the generated POJO as record. >>>> > I also followed the suggestions in this Flink blog post >>>> > < >>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>, >>>> >>>> > to explicitly specify the TypeInfo with returns(...) >>>> > >>>> > Path in = new Path(config.sourceFileSystemPath); >>>> > Schema schema = AccountEntries.getClassSchema(); >>>> > AvroInputFormat<GenericRecord> inputFormat = new >>>> AvroInputFormat<>(in, >>>> > GenericRecord.class); >>>> > DataStream<GenericRecord> accountEntries = env >>>> > .readFile(inputFormat, config.sourceFileSystemPath, >>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS) >>>> > .returns(new GenericRecordAvroTypeInfo(schema)); >>>> > >>>> > >>>> > *Result* >>>> > The class 'org.apache.avro.generic.GenericRecord' is not >>>> instantiable: >>>> > The class is not a proper class. It is either abstract, an interface, >>>> or >>>> > a primitive type. >>>> > >>>> > This looks like a bug. >>>> > I raised the ticket < >>>> https://issues.apache.org/jira/browse/FLINK-18223> >>>> > and I will try to submit a fix, but still do not solve my problem as >>>> I >>>> > am using a managed Flink I cannot update. >>>> > I cannot believe there is no workaround. I do not think I'm trying to >>>> do >>>> > anything bizarre. Am I? >>>> > >>>> > Any ideas? >>>> > Am I missing something obvious? >>>> > >>>> > Cheers >>>> > Lorenzo >>>> >>>> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng