Thanks Arvid,
now it makes sense.
Unfortunately, the problematic schema comes from a 3rd party we cannot
control, we have to ingest and do some work with it before being able to
map out of it.
But at least now the boundary of the problem is clear
Thanks to the whole community
Lorenzo
On Tue, 16
Hi Lorenzo,
I didn't mean to dismiss the issue, but it's not a matter of
incompatibility, it's a matter of unsound generated code. It will break
independently of Flink, since it apparently is a bug in the Avro compiler
1.8.2, so our options to fix it are limited.
What we should do is to bump the A
Hi Arvid,
Sorry but saying the AVRO compiler setup is "broken" sounds like an easy
way for dismissing a problem ;)
I am using the official AVRO 1.8.2 Maven plugin with no customisation to
generate the code.
There might be some legit AVRO configurations that are incompatible with
Flink or somethin
Hi Lorenzo,
Thank you for confirming my suspicion. It really means something is broken
in your Avro compiler setup and there is not much that we can do on our end.
Just for reference, we are having a user.avsc [1] being compiled [2] with
1.8.2 into this snippet [3] for our tests.
Look especially
Hi Arvit,
I followed your instructions for the breakpoint in
SpecificDatumReader.readField *with AVRO 1.8.2*,
For all timestamp-millis fields (I have many):
Conversion conversion = ((SpecificRecordBase) r).getConversion(f.pos());
returns null for all timestamp-millis fields (I have many), so.
Sorry forget my last mail, that was half-finished.
Here is the real one:
Hi Lorenzo,
if you still have time to investigate.
Your stack trace shows that all expected code paths have been taken.
Conversions are there; although they look different than here, but that can
be attributed to the avro
Hi Lorenzo,
if you still have time to investigate.
Your stack trace shows that all expected code paths have been taken.
Conversions are there although they look different than here, but that can
be attributed to the avro upgrade.
@Override
protected void readField(Object r, Schema.Field f, Objec
Thanks Gouwei,
setting format.setReuseAvroValue(false) with 1.8.2-generated records does
not solve the problem.
12:02:59,314 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937.
java.lang.ClassCastException: java.la
Hi,
for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false);
Best,
Guowei
Lorenzo Nicora 于2020年6月11日周四 下午5:02写道:
> Hi Arvid,
>
> thanks for the point about catching records. Gotcha!
>
> Sorry I cannot share the full schema or generated code. It's a 3rd party
> IP and we signed
Hi Arvid,
thanks for the point about catching records. Gotcha!
Sorry I cannot share the full schema or generated code. It's a 3rd party IP
and we signed a meter-think NDA... I think I can post snippets.
The schema is heavily nested, including arrays of other record types
Types are primitives, or
Hi,
I write a test for case 1 but it does not throw any exception. I use
the org.apache.flink.formats.avro.generated.JodaTimeRecord for the test.
Could you check whether AccountEntries.class has following code:
private static final org.apache.avro.Conversion[] conversions =
new org.apache.avr
Hi,
I write a test for the case 1 but it does not throw any exception. I use
the org.apache.flink.formats.avro.generated.JodaTimeRecord for the test.
Best,
Guowei
Arvid Heise 于2020年6月11日周四 下午3:58写道:
> Hi Lorenzo,
>
> I'm glad that it worked out somehow, but I'd still like to understand what
> w
Hi Lorenzo,
I'm glad that it worked out somehow, but I'd still like to understand what
went wrong, so it will work more smoothly for future users. I double
checked and we even test AvroSerializer with logical types, so I'm a bit
puzzled.
Could you attach GlHeader or at least show us how GlHeader#
Hi Arvid,
answering to your other questions
Here is the stacktrace of the case (1), when I try to read using specific
records generated by the AVRO 1.8.2 plugin
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.joda.time.DateTime
at com.tenx.client.generalledger.event.GlHeader.
Hi Lorenzo,
Looking at the stack trace, the issue is that copying a record uses the
serializer directly. So, you need to enableObjectReuse() [1] to avoid that.
Make sure that you are not modifying/caching data after emitting it in your
pipeline (except Flink managed state).
Then, it should be pos
Hi Arvid,
I confirm in the case 3) the problem is AvroSerializer.
How can I use a different serializer with AvroFileFormat?
I would be happy to make the file ingestion working and immediately after
mapping to an hand-written POJO, to avoid any inefficiency or headache with
moving around GenericR
Hi Lorenzo,
1) I'm surprised that this doesn't work. I'd like to see that stacktrace.
2) cannot work like this, because we bundle Avro 1.8.2. You could retest
with dateTimeLogicalType='Joda' set, but then you will probably see the
same issue as 1)
3) I'm surprised that this doesn't work either.
Thanks Timo,
the stacktrace with 1.9.2-generated specific file is the following
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(O
Hi Lorenzo,
as far as I know we don't support Avro's logical times in Flink's
AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports the
1.8.2 version of logical types but might be incompatible with 1.9.2.
Reg 2) Specific record generated with AVRO 1.9.2 plugin:
Could you send u
19 matches
Mail list logo