James Mcguire created FLINK-30158: ------------------------------------- Summary: [Flink SQL][Protobuf] NullPointerException when querying Kafka topic using repeated or map attributes Key: FLINK-30158 URL: https://issues.apache.org/jira/browse/FLINK-30158 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table SQL / Ecosystem Affects Versions: 1.16.0 Reporter: James Mcguire
I am encountering a {{java.lang.NullPointerException}} exception when trying to use Flink SQL to query a kafka topic that uses either {{repeated}} and/or {{map}} attributes. {*}{*}{*}Replication{*} *steps* # Use a protobuf definition that either uses repeated and/or map. This protobuf schema should cover a few of the problematic scenarios I ran into: {code:java} syntax = "proto3"; package example.message; option java_package = "com.example.message"; option java_multiple_files = true; message NestedType { int64 nested_first = 1; oneof nested_second { int64 one_of_first = 2; string one_of_second = 3; } } message Test { repeated int64 first = 1; map<string, NestedType> second = 2; } {code} 2. Attempt query on topic, even excluding problematic columns: {code:java} [ERROR] Could not execute SQL statement. Reason: org.apache.flink.formats.protobuf.PbCodegenException: java.lang.NullPointerException{code} log file: {code:java} 2022-11-22 15:33:59,510 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement.org.apache.flink.table.client.gateway.SqlExecutionException: Error while retrieving result. at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:79) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bc869097009a92d0601add881a6b920c) at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) ~[?:?] at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) ~[?:?] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[?:?] at org.apache.flink.table.client.gateway.local.result.CollectResultBase$ResultRetrievalThread.run(CollectResultBase.java:75) ~[flink-sql-client-1.16.0.jar:1.16.0]Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bc869097009a92d0601add881a6b920c) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?] at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at java.lang.Thread.run(Thread.java:829) ~[?:?]Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?] at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$31(RestClusterClient.java:772) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?] at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:301) ~[flink-dist-1.16.0.jar:1.16.0] at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?] at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) ~[?:?] at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?] at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) ~[?:?] at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085) ~[?:?] at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) ~[?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at java.lang.Thread.run(Thread.java:829) ~[?:?]Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477) ~[flink-dist-1.16.0.jar:1.16.0] at jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[?:?] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[?:?] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[?:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[?:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[?:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[?:?] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[flink-scala_2.12-1.16.0.jar:1.16.0] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[flink-scala_2.12-1.16.0.jar:1.16.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[?:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-scala_2.12-1.16.0.jar:1.16.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-scala_2.12-1.16.0.jar:1.16.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[flink-scala_2.12-1.16.0.jar:1.16.0] at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[?:?] at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[?:?] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[?:?] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[?:?] at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[?:?] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[?:?] at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[?:?] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[?:?] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) ~[?:?] at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) ~[?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) ~[?:?] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) ~[?:?] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) ~[?:?]Caused by: org.apache.flink.formats.protobuf.PbCodegenException: java.lang.NullPointerException at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:126) ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) ~[?:?] at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) ~[?:?] at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) ~[?:?] at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) ~[?:?] at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:286) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:94) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:692) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0.jar:1.16.0] at java.lang.Thread.run(Thread.java:829) ~[?:?]Caused by: java.lang.NullPointerException at org.apache.flink.formats.protobuf.deserialize.PbCodegenRowDeserializer.pbGetMessageElementCode(PbCodegenRowDeserializer.java:106) ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at org.apache.flink.formats.protobuf.deserialize.PbCodegenRowDeserializer.codegen(PbCodegenRowDeserializer.java:84) ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:109) ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64) ~[flink-sql-protobuf-1.16.0.jar:1.16.0] at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94) ~[?:?] at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47) ~[?:?] at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144) ~[?:?] at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135) ~[?:?] at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:286) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:94) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:692) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0.jar:1.16.0] at java.lang.Thread.run(Thread.java:829) ~[?:?]{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)