Hi Lorenzo,

Looking at the stack trace, the issue is that copying a record uses the
serializer directly. So, you need to enableObjectReuse() [1] to avoid that.
Make sure that you are not modifying/caching data after emitting it in your
pipeline (except Flink managed state).

Then, it should be possible to directly use a map after the source to
convert it into POJO and I'd assume that a different serializer will be
picked.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html

On Wed, Jun 10, 2020 at 11:13 PM Lorenzo Nicora <lorenzo.nic...@gmail.com>
wrote:

> Hi Arvid,
>
> I confirm in the case 3) the problem is AvroSerializer.
>
> How can I use a different serializer with AvroFileFormat?
>
> I would be happy to make the file ingestion working and immediately after
> mapping to an hand-written POJO, to avoid any inefficiency or headache with
> moving around GenericRecords, if this is what you mean with back and forth
> type transformation
>
> Cheers
> Lorenzo
>
> On Wed, 10 Jun 2020, 17:52 Arvid Heise, <ar...@ververica.com> wrote:
>
>> Hi Lorenzo,
>>
>> 1) I'm surprised that this doesn't work. I'd like to see that stacktrace.
>>
>> 2) cannot work like this, because we bundle Avro 1.8.2. You could retest
>> with dateTimeLogicalType='Joda' set, but then you will probably see the
>> same issue as 1)
>>
>> 3) I'm surprised that this doesn't work either. There is a codepath since
>> 2016 for GenericRecord and it's covered in a test. From the error
>> description and the ticket, it looks like the issue is not the
>> AvroInputFormat, but the serializer. So it would probably work with a
>> different serializer (but that would cause back and forth type
>> transformation).
>>
>> On Wed, Jun 10, 2020 at 4:31 PM Lorenzo Nicora <lorenzo.nic...@gmail.com>
>> wrote:
>>
>>> Thanks Timo,
>>>
>>> the stacktrace with 1.9.2-generated specific file is the following
>>>
>>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>> Could not forward element to next operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:325)
>>> Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
>>> java.time.Instant: 2020-06-01T02:00:42.105Z
>>> at
>>> org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:909)
>>> at
>>> org.apache.avro.specific.SpecificData.getSchemaName(SpecificData.java:420)
>>> at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:871)
>>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1302)
>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>> at org.apache.avro.generic.GenericData.deepCopyRaw(GenericData.java:1283)
>>> at org.apache.avro.generic.GenericData.deepCopy(GenericData.java:1221)
>>> at
>>> org.apache.flink.formats.avro.typeutils.AvroSerializer.copy(AvroSerializer.java:242)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
>>> ... 7 more
>>>
>>>
>>> I reckon logical types might have been considered somehow experimental
>>> since...ever. But, honestly, I've been using them in the Kafka/Java
>>> ecosystem as well as in Spark without too many problems.
>>>
>>> For my specific use case, the schema is given. Messages are produced by
>>> a 3rd party and we cannot change the schema (especially because it's a
>>> legit schema).
>>> I am desperately looking for a workaround.
>>>
>>> I  had a similar issue with a Kafka Source, and AVRO records containing
>>> decimals and timestamps. Timestamps worked but not decimals.
>>> I was able to work around the problem using GenericRecords.
>>> But Kafka source relies on AvroDeserializationSchema rather than
>>> AvroSerializer, and has no problem handling GenericRecords.
>>>
>>> I'm honestly finding very confusing having different ways of handling
>>> AVRO deserialization inside Flink core components.
>>>
>>> Cheers
>>> Lorenzo
>>>
>>>
>>> On Wed, 10 Jun 2020 at 15:02, Timo Walther <twal...@apache.org> wrote:
>>>
>>>> Hi Lorenzo,
>>>>
>>>> as far as I know we don't support Avro's logical times in Flink's
>>>> AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports the
>>>> 1.8.2 version of logical types but might be incompatible with 1.9.2.
>>>>
>>>> Reg 2) Specific record generated with AVRO 1.9.2 plugin:
>>>>
>>>> Could you send us the full stack trace? I think this should actually
>>>> work, because specific records are handled as POJOs and those should be
>>>> able to also deal with logical type's classes through Kryo.
>>>>
>>>> Reg 3) Generic record
>>>>
>>>> It would be great if we can make this option possible. We could include
>>>> it in the next minor release fix.
>>>>
>>>> Sorry, for the bad user experience. But IMHO logical type are still
>>>> experiemental in Avro. Maybe 1.9.2 has finally fixed the biggest
>>>> shortcomings such that Flink can properly support them as well.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> [1]
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
>>>>
>>>>
>>>> On 10.06.20 15:08, Lorenzo Nicora wrote:
>>>> > Hi,
>>>> >
>>>> > I need to continuously ingest AVRO files as they arrive.
>>>> > Files are written by an S3 Sink Kafka Connect but S3 is not the point
>>>> > here. I started trying to ingest a static bunch of files from local
>>>> fs
>>>> > first and I am having weird issues with AVRO deserialization.
>>>> >
>>>> > I have to say, the records contain logical types, timestamps-ms and
>>>> decimals
>>>> >
>>>> > To keep it simple, I extracted the AVRO schema from the data files
>>>> and
>>>> > used avro-maven-plugin to generate POJOs
>>>> > I tried multiple combinations, all with no luck
>>>> >
>>>> > 1) Specific record generated with AVRO 1.8.2 plugin
>>>> >
>>>> > Path in = new Path(sourceBasePath);
>>>> > AvroInputFormat<AccountEntries> inputFormat = new
>>>> AvroInputFormat<>(in,
>>>> > AccountEntries.class);
>>>> > DataStream<AccountEntries> accountEntries = env
>>>> > .readFile(inputFormat, sourceBasePath,
>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS);
>>>> >
>>>> > *Result*
>>>> > java.lang.ClassCastException: java.lang.Long cannot be cast to
>>>> > org.joda.time.DateTime
>>>> > (IIRC this is a known AVRO 1.8.2 issue)
>>>> >
>>>> >
>>>> > 2) Specific record generated with AVRO 1.9.2 plugin
>>>> > Same code as above but AVRO POJOs are generated with AVRO 1.9.2
>>>> >
>>>> > *Result*
>>>> > org.apache.avro.AvroRuntimeException: Unknown datum type
>>>> java.time.Instant
>>>> >
>>>> >
>>>> > 3) Generic record
>>>> > I am getting the Schema from the generated specific record, for
>>>> > convenience, but I am not using the generated POJO as record.
>>>> > I also followed the suggestions in this Flink blog post
>>>> > <
>>>> https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro-generic>,
>>>>
>>>> > to explicitly specify the TypeInfo with returns(...)
>>>> >
>>>> > Path in = new Path(config.sourceFileSystemPath);
>>>> > Schema schema = AccountEntries.getClassSchema();
>>>> > AvroInputFormat<GenericRecord> inputFormat = new
>>>> AvroInputFormat<>(in,
>>>> > GenericRecord.class);
>>>> > DataStream<GenericRecord> accountEntries = env
>>>> > .readFile(inputFormat, config.sourceFileSystemPath,
>>>> > FileProcessingMode.PROCESS_CONTINUOUSLY, FILE_SCAN_INTERVAL_MS)
>>>> > .returns(new GenericRecordAvroTypeInfo(schema));
>>>> >
>>>> >
>>>> > *Result*
>>>> > The class 'org.apache.avro.generic.GenericRecord' is not
>>>> instantiable:
>>>> > The class is not a proper class. It is either abstract, an interface,
>>>> or
>>>> > a primitive type.
>>>> >
>>>> > This looks like a bug.
>>>> > I raised the ticket <
>>>> https://issues.apache.org/jira/browse/FLINK-18223>
>>>> > and I will try to submit a fix, but still do not solve my problem as
>>>> I
>>>> > am using a managed Flink I cannot update.
>>>> > I cannot believe there is no workaround. I do not think I'm trying to
>>>> do
>>>> > anything bizarre. Am I?
>>>> >
>>>> > Any ideas?
>>>> > Am I missing something obvious?
>>>> >
>>>> > Cheers
>>>> > Lorenzo
>>>>
>>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to