Hi Lorenzo,

1) I'm surprised that this doesn't work. I'd like to see that stacktrace.

2) cannot work like this, because we bundle Avro 1.8.2. You could retest
with dateTimeLogicalType='Joda' set, but then you will probably see the
same issue as 1)

3) I'm surprised that this doesn't work either. There is a codepath since
2016 for GenericRecord and it's covered in a test. From the error
description and the ticket, it looks like the issue is not the
AvroInputFormat, but the serializer. So it would probably work with a
different serializer (but that would cause back and forth type
transformation).

On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora <lorenzo.nic...@gmail.com>
wrote:

> Thanks Timo,
>
> the stacktrace with 1.9.2-generated specific file is the following
>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
> java.time.Instant: 2020-06-01T02:00:42.105Z
> at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
> at
> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
> at
> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
> ... 7 more
>
>
> I reckon logical types might have been considered somehow experimental
> since...ever. But, honestly, I've been using them in the Kafka/Java
> ecosystem as well as in Spark without too many problems.
>
> For my specific use case, the schema is given. Messages are produced by a
> 3rd party and we cannot change the schema (especially because it's a legit
> schema).
> I am desperately looking for a workaround.
>
> I  had a similar issue with a Kafka Source, and AVRO records containing
> decimals and timestamps. Timestamps worked but not decimals.
> I was able to work around the problem using GenericRecords.
> But Kafka source relies on AvroDeserializationSchema rather than
> AvroSerializer, and has no problem handling GenericRecords.
>
> I'm honestly finding very confusing having different ways of handling AVRO
> deserialization inside Flink core components.
>
> Cheers
> Lorenzo
>
>
> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> wrote:
>
>> Hi Lorenzo,
>>
>> as far as I know we don't support Avro's logical times in Flink's
>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports the
>> 1.8.2 version of logical types but might be incompatible with 1.9.2.
>>
>> Reg 2) Specific record generated with AVRO 1.9.2 plugin:
>>
>> Could you send us the full stack trace? I think this should actually
>> work, because specific records are handled as POJOs and those should be
>> able to also deal with logical type's classes through Kryo.
>>
>> Reg 3) Generic record
>>
>> It would be great if we can make this option possible. We could include
>> it in the next minor release fix.
>>
>> Sorry, for the bad user experience. But IMHO logical type are still
>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest
>> shortcomings such that Flink can properly support them as well.
>>
>> Regards,
>> Timo
>>
>> [1]
>>
>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
>>
>>
>> On 10.06.20 15:08, Lorenzo Nicora wrote:
>> > Hi,
>> >
>> > I need to continuously ingest AVRO files as they arrive.
>> > Files are written by an S3 Sink Kafka Connect but S3 is not the point
>> > here. I started trying to ingest a static bunch of files from local fs
>> > first and I am having weird issues with AVRO deserialization.
>> >
>> > I have to say, the records contain logical types, timestamps-ms and
>> decimals
>> >
>> > To keep it simple, I extracted the AVRO schema from the data files and
>> > used avro-maven-plugin to generate POJOs
>> > I tried multiple combinations, all with no luck
>> >
>> > 1) Specific record generated with AVRO 1.8.2 plugin
>> >
>> > Path in = new Path(sourceBasePath);
>> > AvroInputFormat<AccountEntries> inputFormat = new AvroInputFormat<>(in,
>> > AccountEntries.class);
>> > DataStream<AccountEntries> accountEntries = env
>> > .readFile(inputFormat, sourceBasePath,
>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS);
>> >
>> > *Result*
>> > java.lang.ClassCastException: java.lang.Long cannot be cast to
>> > org.joda.time.DateTime
>> > (IIRC this is a known AVRO 1.8.2 issue)
>> >
>> >
>> > 2) Specific record generated with AVRO 1.9.2 plugin
>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2
>> >
>> > *Result*
>> > org.apache.avro.AvroRuntimeException: Unknown datum type
>> java.time.Instant
>> >
>> >
>> > 3) Generic record
>> > I am getting the Schema from the generated specific record, for
>> > convenience, but I am not using the generated POJO as record.
>> > I also followed the suggestions in this Flink blog post
>> > <
>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>,
>>
>> > to explicitly specify the TypeInfo with returns(...)
>> >
>> > Path in = new Path(config.sourceFileSystemPath);
>> > Schema schema = AccountEntries.getClassSchema();
>> > AvroInputFormat<GenericRecord> inputFormat = new AvroInputFormat<>(in,
>> > GenericRecord.class);
>> > DataStream<GenericRecord> accountEntries = env
>> > .readFile(inputFormat, config.sourceFileSystemPath,
>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS)
>> > .returns(new GenericRecordAvroTypeInfo(schema));
>> >
>> >
>> > *Result*
>> > The class 'org.apache.avro.generic.GenericRecord' is not instantiable:
>> > The class is not a proper class. It is either abstract, an interface,
>> or
>> > a primitive type.
>> >
>> > This looks like a bug.
>> > I raised the ticket <https://issues.apache.org/jira/browse/FLINK-18223>
>>
>> > and I will try to submit a fix, but still do not solve my problem as I
>> > am using a managed Flink I cannot update.
>> > I cannot believe there is no workaround. I do not think I'm trying to
>> do
>> > anything bizarre. Am I?
>> >
>> > Any ideas?
>> > Am I missing something obvious?
>> >
>> > Cheers
>> > Lorenzo
>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to