Hi Dan, Do you have an example job and some sample data to reproduce this problem? I couldn't reproduce it locally with a simple example job.
Cheers, Till On Thu, Dec 10, 2020 at 5:51 PM Dan Hill <quietgol...@gmail.com> wrote: > Yea, the error makes sense and was an easy fix. > > Any idea what happened with the hidden stacktrace? The hidden stacktrace > made this 100x more difficult. > > On Thu, Dec 10, 2020 at 12:59 AM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> It looks like the problem is that there's a problem in reading a null >> value in the AvroRowDataDeserializationSchema (see below for the snippet of >> code from Flink 1.11.1). >> The problem is due to the fact that there's a bad typing of the source so >> the call to createConverter() within the createNullableConverter() returns >> null, creating a null on fieldConverters[i] and, in the end, a NullPointer >> in fieldConverters[i].convert(). Does it make sense? >> >> static DeserializationRuntimeConverter createRowConverter(RowType >> rowType) { >> final DeserializationRuntimeConverter[] fieldConverters = >> rowType.getFields().stream() >> .map(RowType.RowField::getType) >> .map(AvroRowDataDeserializationSchema::createNullableConverter) >> .toArray(DeserializationRuntimeConverter[]::new); >> final int arity = rowType.getFieldCount(); >> return avroObject -> { >> IndexedRecord record = (IndexedRecord) avroObject; >> GenericRowData row = new GenericRowData(arity); >> for (int i = 0; i < arity; ++i) { >> row.setField(i, fieldConverters[i].convert(record.get(i))); >> } >> return row; >> }; >> } >> >> Best, >> Flavio >> >> On Thu, Dec 10, 2020 at 8:39 AM Dan Hill <quietgol...@gmail.com> wrote: >> >>> One of the Exception instances finally reported a stacktrace. I'm not >>> sure why it's so infrequent. >>> >>> java.lang.NullPointerException: null >>> >>> at >>> org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154) >>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:338) >>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.formats.avro.AvroRowDataSerializationSchema.lambda$createRowConverter$6827278$1(AvroRowDataSerializationSchema.java:177) >>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?] >>> >>> at >>> org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:251) >>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?] >>> >>> at >>> org.apache.flink.formats.avro.AvroFileSystemFormatFactory$RowDataAvroWriterFactory$1.addElement(AvroFileSystemFormatFactory.java:247) >>> ~[blob_p-97afea7f96212938a7f59355af26e877ab52777d-022ef71f40ed0889789b9e942825fbb7:?] >>> >>> at >>> org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:498) >>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.table.filesystem.FileSystemTableSink$ProjectionBulkFactory$1.addElement(FileSystemTableSink.java:494) >>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:48) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:202) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) >>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at StreamExecCalc$793.processElement(Unknown Source) ~[?:?] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.table.runtime.operators.join.interval.EmitAwareCollector.collect(EmitAwareCollector.java:48) >>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.table.runtime.operators.join.interval.EmitAwareCollector.collect(EmitAwareCollector.java:28) >>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.table.runtime.operators.join.interval.TimeIntervalJoin.lambda$removeExpiredRows$2(TimeIntervalJoin.java:411) >>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] >>> >>> at java.util.ArrayList.forEach(ArrayList.java:1259) >>> ~[?:1.8.0_265] >>> >>> at >>> org.apache.flink.table.runtime.operators.join.interval.TimeIntervalJoin.removeExpiredRows(TimeIntervalJoin.java:408) >>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.table.runtime.operators.join.interval.TimeIntervalJoin.onTimer(TimeIntervalJoin.java:332) >>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin.onTimer(RowTimeIntervalJoin.java:30) >>> ~[flink-table-blink_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.onEventTime(KeyedCoProcessOperator.java:95) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> at >>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276) >>> ~[flink-dist_2.12-1.11.1.jar:1.11.1] >>> >>> On Wed, Dec 9, 2020 at 4:33 PM Dan Hill <quietgol...@gmail.com> wrote: >>> >>>> In the Flink dashboard, my job is failing with a NullPointerException >>>> but the Exception is not showing a stack trace. I do not see any >>>> NullPointerExceptions in any of the flink-jobmanager and flink-taskmanager >>>> logs. >>>> >>>> Is this a normal issue? >>>> >>>> [image: Screen Shot 2020-12-09 at 4.29.30 PM.png] >>>> >>>