Re: Reading from AVRO files

2020-06-16 Thread Lorenzo Nicora
Thanks Arvid, now it makes sense. Unfortunately, the problematic schema comes from a 3rd party we cannot control, we have to ingest and do some work with it before being able to map out of it. But at least now the boundary of the problem is clear Thanks to the whole community Lorenzo On Tue, 16

Re: Reading from AVRO files

2020-06-16 Thread Arvid Heise
Hi Lorenzo, I didn't mean to dismiss the issue, but it's not a matter of incompatibility, it's a matter of unsound generated code. It will break independently of Flink, since it apparently is a bug in the Avro compiler 1.8.2, so our options to fix it are limited. What we should do is to bump the A

Re: Reading from AVRO files

2020-06-16 Thread Lorenzo Nicora
Hi Arvid, Sorry but saying the AVRO compiler setup is "broken" sounds like an easy way for dismissing a problem ;) I am using the official AVRO 1.8.2 Maven plugin with no customisation to generate the code. There might be some legit AVRO configurations that are incompatible with Flink or somethin

Re: Reading from AVRO files

2020-06-15 Thread Arvid Heise
Hi Lorenzo, Thank you for confirming my suspicion. It really means something is broken in your Avro compiler setup and there is not much that we can do on our end. Just for reference, we are having a user.avsc [1] being compiled [2] with 1.8.2 into this snippet [3] for our tests. Look especially

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Hi Arvit, I followed your instructions for the breakpoint in SpecificDatumReader.readField *with AVRO 1.8.2*, For all timestamp-millis fields (I have many): Conversion conversion = ((SpecificRecordBase) r).getConversion(f.pos()); returns null for all timestamp-millis fields (I have many), so.

Re: Reading from AVRO files

2020-06-11 Thread Arvid Heise
Sorry forget my last mail, that was half-finished. Here is the real one: Hi Lorenzo, if you still have time to investigate. Your stack trace shows that all expected code paths have been taken. Conversions are there; although they look different than here, but that can be attributed to the avro

Re: Reading from AVRO files

2020-06-11 Thread Arvid Heise
Hi Lorenzo, if you still have time to investigate. Your stack trace shows that all expected code paths have been taken. Conversions are there although they look different than here, but that can be attributed to the avro upgrade. @Override protected void readField(Object r, Schema.Field f, Objec

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Thanks Gouwei, setting format.setReuseAvroValue(false) with 1.8.2-generated records does not solve the problem. 12:02:59,314 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 1 of job 46ea458aff2a496c4617a6b57e4de937. java.lang.ClassCastException: java.la

Re: Reading from AVRO files

2020-06-11 Thread Guowei Ma
Hi, for the 1.8.2(case 1) you could try the format.setReuseAvroValue(false); Best, Guowei Lorenzo Nicora 于2020年6月11日周四 下午5:02写道: > Hi Arvid, > > thanks for the point about catching records. Gotcha! > > Sorry I cannot share the full schema or generated code. It's a 3rd party > IP and we signed

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Hi Arvid, thanks for the point about catching records. Gotcha! Sorry I cannot share the full schema or generated code. It's a 3rd party IP and we signed a meter-think NDA... I think I can post snippets. The schema is heavily nested, including arrays of other record types Types are primitives, or

Re: Reading from AVRO files

2020-06-11 Thread Guowei Ma
Hi, I write a test for case 1 but it does not throw any exception. I use the org.apache.flink.formats.avro.generated.JodaTimeRecord for the test. Could you check whether AccountEntries.class has following code: private static final org.apache.avro.Conversion[] conversions = new org.apache.avr

Re: Reading from AVRO files

2020-06-11 Thread Guowei Ma
Hi, I write a test for the case 1 but it does not throw any exception. I use the org.apache.flink.formats.avro.generated.JodaTimeRecord for the test. Best, Guowei Arvid Heise 于2020年6月11日周四 下午3:58写道: > Hi Lorenzo, > > I'm glad that it worked out somehow, but I'd still like to understand what > w

Re: Reading from AVRO files

2020-06-11 Thread Arvid Heise
Hi Lorenzo, I'm glad that it worked out somehow, but I'd still like to understand what went wrong, so it will work more smoothly for future users. I double checked and we even test AvroSerializer with logical types, so I'm a bit puzzled. Could you attach GlHeader or at least show us how GlHeader#

Re: Reading from AVRO files

2020-06-11 Thread Lorenzo Nicora
Hi Arvid, answering to your other questions Here is the stacktrace of the case (1), when I try to read using specific records generated by the AVRO 1.8.2 plugin java.lang.ClassCastException: java.lang.Long cannot be cast to org.joda.time.DateTime at com.tenx.client.generalledger.event.GlHeader.

Re: Reading from AVRO files

2020-06-10 Thread Arvid Heise
Hi Lorenzo, Looking at the stack trace, the issue is that copying a record uses the serializer directly. So, you need to enableObjectReuse() [1] to avoid that. Make sure that you are not modifying/caching data after emitting it in your pipeline (except Flink managed state). Then, it should be pos

Re: Reading from AVRO files

2020-06-10 Thread Lorenzo Nicora
Hi Arvid, I confirm in the case 3) the problem is AvroSerializer. How can I use a different serializer with AvroFileFormat? I would be happy to make the file ingestion working and immediately after mapping to an hand-written POJO, to avoid any inefficiency or headache with moving around GenericR

Re: Reading from AVRO files

2020-06-10 Thread Arvid Heise
Hi Lorenzo, 1) I'm surprised that this doesn't work. I'd like to see that stacktrace. 2) cannot work like this, because we bundle Avro 1.8.2. You could retest with dateTimeLogicalType='Joda' set, but then you will probably see the same issue as 1) 3) I'm surprised that this doesn't work either.

Re: Reading from AVRO files

2020-06-10 Thread Lorenzo Nicora
Thanks Timo, the stacktrace with 1.9.2-generated specific file is the following org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(O

Re: Reading from AVRO files

2020-06-10 Thread Timo Walther
Hi Lorenzo, as far as I know we don't support Avro's logical times in Flink's AvroInputFormat yet. E.g. AvroRowDeserializationSchema [1] supports the 1.8.2 version of logical types but might be incompatible with 1.9.2. Reg 2) Specific record generated with AVRO 1.9.2 plugin: Could you send u