Hi, for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false);
Best, Guowei Lorenzo Nicora <lorenzo.nic...@gmail.com> 于2020年6月11日周四 下午5:02写道: > Hi Arvid, > > thanks for the point about catching records. Gotcha! > > Sorry I cannot share the full schema or generated code. It's a 3rd party > IP and we signed a meter-think NDA... I think I can post snippets. > The schema is heavily nested, including arrays of other record types > Types are primitives, or logical decimal and timestamp-millis. No union. > > #conversion is in AccountEntries only (one of the nested records) and > looks like this: > > private static final org.apache.avro.Conversion<?>[] conversions = > new org.apache.avro.Conversion<?>[] { > null, > null, > null, > new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), > new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), > new org.apache.avro.data.JodaTimeConversions.TimestampConversion(), > null, > null, > null, > null, > null, > null, > null > }; > > > Note that I have to generate the specific object with AVRO 1.9.2 Maven > Plugin. > With 1.8.2 generated code it fails with the following exception, > regardless setting enableObjectReuse() > > java.lang.ClassCastException: java.lang.Long cannot be cast to > org.joda.time.DateTime > at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) > at org.apache.avro.generic.GenericData.setField(GenericData.java:690) > at > org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) > at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) > at > org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) > > > Thanks for the help > Lorenzo > > > On Thu, 11 Jun 2020 at 08:58, Arvid Heise <ar...@ververica.com> wrote: > >> Hi Lorenzo, >> >> I'm glad that it worked out somehow, but I'd still like to understand >> what went wrong, so it will work more smoothly for future users. I double >> checked and we even test AvroSerializer with logical types, so I'm a bit >> puzzled. >> >> Could you attach GlHeader or at least show us how GlHeader#conversions look >> like? I want to exclude the possibility that the source generator screwed >> up. >> >> Concerning object reuse is that you need to treat all POJO as immutable >> (I'm assuming that that's what your meant from your description), but you >> should also never cache values like >> class ShiftElements extends MapFunction { >> Object lastElement; >> >> Object map(Object newElement, Collector out) { >> out.collect(lastElement); >> lastElement = newElement; // <- never cache with enableObjectReuse >> } >> } >> >> (excuse my ugly code) >> >> On Thu, Jun 11, 2020 at 9:25 AM Lorenzo Nicora <lorenzo.nic...@gmail.com> >> wrote: >> >>> Hi Arvid, >>> >>> answering to your other questions >>> >>> Here is the stacktrace of the case (1), when I try to read using >>> specific records generated by the AVRO 1.8.2 plugin >>> >>> java.lang.ClassCastException: java.lang.Long cannot be cast to >>> org.joda.time.DateTime >>> at com.tenx.client.generalledger.event.GlHeader.put(GlHeader.java:125) >>> at org.apache.avro.generic.GenericData.setField(GenericData.java:690) >>> at >>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:119) >>> at >>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>> at >>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>> at >>> org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) >>> at >>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222) >>> at >>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153) >>> at >>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145) >>> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233) >>> at >>> org.apache.flink.formats.avro.AvroInputFormat.nextRecord(AvroInputFormat.java:165) >>> at >>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:323) >>> >>> >>> I also tried generating the specific object with avro 1.9.2 (2) but >>> forcing it to use Joda time but still didn't work >>> >>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>> Could not forward element to next operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >>> at >>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >>> at >>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type >>> org.joda.time.DateTime: 2020-06-01T02:00:42.326Z >>> at >>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >>> at >>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>> at >>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >>> ... 7 more >>> >>> >>> But in the second case, it seems the failure happens when Flink tries to >>> make a copy of the record. >>> So I followed your suggestion of enableObjectReuse() and* IT WORKS!* >>> >>> I am not sure I understand all implications of object reuse in Flink, >>> specifically. >>> I am familiar with the general risk of mutable messages, and I always >>> handle them as mutable even when they are POJO. Never mutating and >>> forwarding the same record. >>> Not sure whether there are other implications in Flink. >>> >>> Many thanks >>> Lorenzo >>> >>> >>> On Wed, 10 Jun 2020 at 17:52, Arvid Heise <ar...@ververica.com> wrote: >>> >>>> Hi Lorenzo, >>>> >>>> 1) I'm surprised that this doesn't work. I'd like to see that >>>> stacktrace. >>>> >>>> 2) cannot work like this, because we bundle Avro 1.8.2. You could >>>> retest with dateTimeLogicalType='Joda' set, but then you will probably >>>> see the same issue as 1) >>>> >>>> 3) I'm surprised that this doesn't work either. There is a codepath >>>> since 2016 for GenericRecord and it's covered in a test. From the error >>>> description and the ticket, it looks like the issue is not the >>>> AvroInputFormat, but the serializer. So it would probably work with a >>>> different serializer (but that would cause back and forth type >>>> transformation). >>>> >>>> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora < >>>> lorenzo.nic...@gmail.com> wrote: >>>> >>>>> Thanks Timo, >>>>> >>>>> the stacktrace with 1.9.2-generated specific file is the following >>>>> >>>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >>>>> Could not forward element to next operator >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) >>>>> at >>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) >>>>> at >>>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) >>>>> at >>>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325) >>>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type >>>>> java.time.Instant: 2020-06-01T02:00:42.105Z >>>>> at >>>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909) >>>>> at >>>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420) >>>>> at >>>>> org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871) >>>>> at >>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302) >>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>> at >>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>> at >>>>> org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283) >>>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221) >>>>> at >>>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) >>>>> ... 7 more >>>>> >>>>> >>>>> I reckon logical types might have been considered somehow experimental >>>>> since...ever. But, honestly, I've been using them in the Kafka/Java >>>>> ecosystem as well as in Spark without too many problems. >>>>> >>>>> For my specific use case, the schema is given. Messages are produced >>>>> by a 3rd party and we cannot change the schema (especially because it's a >>>>> legit schema). >>>>> I am desperately looking for a workaround. >>>>> >>>>> I had a similar issue with a Kafka Source, and AVRO records >>>>> containing decimals and timestamps. Timestamps worked but not decimals. >>>>> I was able to work around the problem using GenericRecords. >>>>> But Kafka source relies on AvroDeserializationSchema rather than >>>>> AvroSerializer, and has no problem handling GenericRecords. >>>>> >>>>> I'm honestly finding very confusing having different ways of handling >>>>> AVRO deserialization inside Flink core components. >>>>> >>>>> Cheers >>>>> Lorenzo >>>>> >>>>> >>>>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> wrote: >>>>> >>>>>> Hi Lorenzo, >>>>>> >>>>>> as far as I know we don't support Avro's logical times in Flink's >>>>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports >>>>>> the >>>>>> 1.8.2 version of logical types but might be incompatible with 1.9.2. >>>>>> >>>>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin: >>>>>> >>>>>> Could you send us the full stack trace? I think this should actually >>>>>> work, because specific records are handled as POJOs and those should >>>>>> be >>>>>> able to also deal with logical type's classes through Kryo. >>>>>> >>>>>> Reg 3) Generic record >>>>>> >>>>>> It would be great if we can make this option possible. We could >>>>>> include >>>>>> it in the next minor release fix. >>>>>> >>>>>> Sorry, for the bad user experience. But IMHO logical type are still >>>>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest >>>>>> shortcomings such that Flink can properly support them as well. >>>>>> >>>>>> Regards, >>>>>> Timo >>>>>> >>>>>> [1] >>>>>> >>>>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java >>>>>> >>>>>> >>>>>> On 10.06.20 15:08, Lorenzo Nicora wrote: >>>>>> > Hi, >>>>>> > >>>>>> > I need to continuously ingest AVRO files as they arrive. >>>>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the >>>>>> point >>>>>> > here. I started trying to ingest a static bunch of files from local >>>>>> fs >>>>>> > first and I am having weird issues with AVRO deserialization. >>>>>> > >>>>>> > I have to say, the records contain logical types, timestamps-ms and >>>>>> decimals >>>>>> > >>>>>> > To keep it simple, I extracted the AVRO schema from the data files >>>>>> and >>>>>> > used avro-maven-plugin to generate POJOs >>>>>> > I tried multiple combinations, all with no luck >>>>>> > >>>>>> > 1) Specific record generated with AVRO 1.8.2 plugin >>>>>> > >>>>>> > Path in = new Path(sourceBasePath); >>>>>> > AvroInputFormat<AccountEntries> inputFormat = new >>>>>> AvroInputFormat<>(in, >>>>>> > AccountEntries.class); >>>>>> > DataStream<AccountEntries> accountEntries = env >>>>>> > .readFile(inputFormat, sourceBasePath, >>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS); >>>>>> > >>>>>> > *Result* >>>>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to >>>>>> > org.joda.time.DateTime >>>>>> > (IIRC this is a known AVRO 1.8.2 issue) >>>>>> > >>>>>> > >>>>>> > 2) Specific record generated with AVRO 1.9.2 plugin >>>>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2 >>>>>> > >>>>>> > *Result* >>>>>> > org.apache.avro.AvroRuntimeException: Unknown datum type >>>>>> java.time.Instant >>>>>> > >>>>>> > >>>>>> > 3) Generic record >>>>>> > I am getting the Schema from the generated specific record, for >>>>>> > convenience, but I am not using the generated POJO as record. >>>>>> > I also followed the suggestions in this Flink blog post >>>>>> > < >>>>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>, >>>>>> >>>>>> > to explicitly specify the TypeInfo with returns(...) >>>>>> > >>>>>> > Path in = new Path(config.sourceFileSystemPath); >>>>>> > Schema schema = AccountEntries.getClassSchema(); >>>>>> > AvroInputFormat<GenericRecord> inputFormat = new >>>>>> AvroInputFormat<>(in, >>>>>> > GenericRecord.class); >>>>>> > DataStream<GenericRecord> accountEntries = env >>>>>> > .readFile(inputFormat, config.sourceFileSystemPath, >>>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS) >>>>>> > .returns(new GenericRecordAvroTypeInfo(schema)); >>>>>> > >>>>>> > >>>>>> > *Result* >>>>>> > The class 'org.apache.avro.generic.GenericRecord' is not >>>>>> instantiable: >>>>>> > The class is not a proper class. It is either abstract, an >>>>>> interface, or >>>>>> > a primitive type. >>>>>> > >>>>>> > This looks like a bug. >>>>>> > I raised the ticket < >>>>>> https://issues.apache.org/jira/browse/FLINK-18223> >>>>>> > and I will try to submit a fix, but still do not solve my problem >>>>>> as I >>>>>> > am using a managed Flink I cannot update. >>>>>> > I cannot believe there is no workaround. I do not think I'm trying >>>>>> to do >>>>>> > anything bizarre. Am I? >>>>>> > >>>>>> > Any ideas? >>>>>> > Am I missing something obvious? >>>>>> > >>>>>> > Cheers >>>>>> > Lorenzo >>>>>> >>>>>> >>>> >>>> -- >>>> >>>> Arvid Heise | Senior Java Developer >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>>> (Toni) Cheng >>>> >>> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> >