Hi, Xiaolong As Shammon says, I think you should the exception info of Flink cluster first to confirm the root cause.
Best, Ron Shammon FY <zjur...@gmail.com> 于2023年7月4日周二 16:44写道: > Hi Xiaolong, > > I think you may need to check the error log in the flink cluster to find > out the root cause. > > Best, > Shammon FY > > On Tue, Jul 4, 2023 at 3:38 PM Xiaolong Wang <xiaolong.w...@smartnews.com> > wrote: > >> The flink web ui is fine until I run the Hive query. After that the flink >> deployment is down and the web UI is not accessible. >> >> On Tue, Jul 4, 2023 at 9:13 AM Shammon FY <zjur...@gmail.com> wrote: >> >>> Hi Xiaolong, >>> >>> From the exception it seems that the flink session cluster is not >>> running properly. Can you visit the flink web ui and everything is ok? >>> >>> Best, >>> Shammon FY >>> >>> On Mon, Jul 3, 2023 at 2:43 PM Xiaolong Wang < >>> xiaolong.w...@smartnews.com> wrote: >>> >>>> Hi, >>>> I've tested the Flink SQL-gateway to run some simple Hive queries but >>>> met some exceptions. >>>> >>>> >>>> Environment Description: >>>> Run on : Kubernetes >>>> Deployment Mode: Session Mode (created by a flink-kubernetes-operator) >>>> Steps to run: >>>> 1. Apply a `flinkdeployment` of flink session cluster to flink operator >>>> ``` >>>> apiVersion: flink.apache.org/v1beta1 >>>> kind: FlinkDeployment >>>> metadata: >>>> name: flink-session-cluster-example >>>> namespace: xxx >>>> spec: >>>> image: xxx/flink:1.17-sql-gateway-dev >>>> flinkVersion: v1_17 >>>> flinkConfiguration: >>>> taskmanager.numberOfTaskSlots: "2" >>>> pipeline.max-parallelism: "1000" >>>> state.backend.type: rocksdb >>>> state.backend.incremental: "true" >>>> state.checkpoints.dir: xxx >>>> execution.checkpointing.interval: 1m >>>> execution.checkpointing.timeout: 30m >>>> high-availability: >>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>>> high-availability.storageDir: xxx >>>> akka.framesize: 20971520b >>>> execution.checkpointing.externalized-checkpoint-retention: >>>> RETAIN_ON_CANCELLATION >>>> taskmanager.memory.managed.fraction: "0.2" >>>> kubernetes.hadoop.conf.config-map.name: xxx >>>> serviceAccount: default >>>> podTemplate: >>>> apiVersion: v1 >>>> kind: Pod >>>> metadata: >>>> name: pod-template >>>> spec: >>>> serviceAccount: default >>>> jobManager: >>>> resource: >>>> memory: "2048m" >>>> cpu: 1 >>>> taskManager: >>>> resource: >>>> memory: "4096m" >>>> cpu: 1 >>>> ``` >>>> This image has been built with a `hadoop dependency` , an existing >>>> `hadoop configmap`. >>>> >>>> 2. Login to the job-manager pod and run the followings >>>> `./bin/sql-gateway.sh start-foreground >>>> -Dsql-gateway.endpoint.type=hiveserver2 >>>> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/opt/flink/hive-conf` >>>> >>>> 3. Start a beeline and connect to the SQL gateway then run a simple >>>> Hive query >>>> `select count(1) from simple_demo_output where dt = '2021-08-14';` >>>> >>>> 4.The SQL gateway goes wrong with the following logs: >>>> ``` >>>> >>>> 2023-07-03 06:27:11,078 INFO >>>> org.apache.flink.client.program.rest.RestClusterClient >>>> [] - Submitting job 'collect' (4c99c40392cb935d3df94891655d2ce5). >>>> >>>> 2023-07-03 06:27:15,092 INFO >>>> org.apache.flink.client.program.rest.RestClusterClient >>>> [] - Successfully submitted job 'collect' >>>> (4c99c40392cb935d3df94891655d2ce5) to ' >>>> http://flink-session-cluster-example-rest.realtime-streaming:8081'. >>>> >>>> 2023-07-03 06:27:15,879 ERROR >>>> org.apache.flink.table.gateway.service.operation.OperationManager [] - >>>> Failed to execute the operation 7613f663-8641-428c-b3d2-ec77a12fa6ee. >>>> >>>> org.apache.flink.table.api.TableException: Failed to execute sql >>>> >>>> at >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) >>>> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) >>>> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431) >>>> ~[flink-sql-gateway-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) >>>> ~[flink-sql-gateway-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) >>>> ~[flink-sql-gateway-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) >>>> ~[flink-sql-gateway-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) >>>> ~[flink-sql-gateway-1.17.1.jar:1.17.1] >>>> >>>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >>>> [?:?] >>>> >>>> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?] >>>> >>>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >>>> [?:?] >>>> >>>> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?] >>>> >>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) >>>> [?:?] >>>> >>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >>>> [?:?] >>>> >>>> at java.lang.Thread.run(Unknown Source) [?:?] >>>> >>>> Caused by: org.apache.flink.util.FlinkException: Failed to execute job >>>> 'collect'. >>>> >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212) >>>> ~[flink-dist-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) >>>> ~[flink-table-planner_2.12-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:955) >>>> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1] >>>> >>>> ... 13 more >>>> >>>> Caused by: java.lang.RuntimeException: Error while waiting for job to >>>> be initialized >>>> >>>> at >>>> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:151) >>>> ~[flink-dist-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:87) >>>> ~[flink-dist-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) >>>> ~[flink-dist-1.17.1.jar:1.17.1] >>>> >>>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown >>>> Source) ~[?:?] >>>> >>>> at java.util.concurrent.CompletableFuture$Completion.run(Unknown >>>> Source) ~[?:?] >>>> >>>> ... 1 more >>>> >>>> Caused by: java.util.concurrent.ExecutionException: >>>> org.apache.flink.runtime.rest.util.RestClientException: [Internal server >>>> error., <Exception on server side: >>>> >>>> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: >>>> Could not send message >>>> [LocalFencedMessage(863401565fbee9bb5587a17d5279473a, >>>> LocalRpcInvocation(JobMasterGateway.requestJob(Time)))] from sender >>>> [Actor[akka://flink/temp/jobmanager_2$4d]] to recipient >>>> [Actor[akka://flink/user/rpc/jobmanager_2#-1135810317]], because the >>>> recipient is unreachable. This can either mean that the recipient has been >>>> terminated or that the remote RpcService is currently not reachable. >>>> >>>> at >>>> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61) >>>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) >>>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) >>>> >>>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) >>>> >>>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) >>>> >>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) >>>> >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) >>>> >>>> at akka.actor.Actor.aroundReceive(Actor.scala:537) >>>> >>>> at akka.actor.Actor.aroundReceive$(Actor.scala:535) >>>> >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) >>>> >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) >>>> >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:547) >>>> >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) >>>> >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:231) >>>> >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:243) >>>> >>>> at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) >>>> >>>> at >>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown >>>> Source) >>>> >>>> at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) >>>> >>>> at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) >>>> >>>> at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown >>>> Source) >>>> >>>> >>>> End of exception on server side>] >>>> >>>> at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) >>>> ~[?:?] >>>> >>>> at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?] >>>> >>>> at >>>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$null$0(AbstractSessionClusterExecutor.java:88) >>>> ~[flink-dist-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:135) >>>> ~[flink-dist-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:87) >>>> ~[flink-dist-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) >>>> ~[flink-dist-1.17.1.jar:1.17.1] >>>> >>>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown >>>> Source) ~[?:?] >>>> >>>> at java.util.concurrent.CompletableFuture$Completion.run(Unknown >>>> Source) ~[?:?] >>>> >>>> ... 1 more >>>> >>>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: >>>> [Internal server error., <Exception on server side: >>>> >>>> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: >>>> Could not send message >>>> [LocalFencedMessage(863401565fbee9bb5587a17d5279473a, >>>> LocalRpcInvocation(JobMasterGateway.requestJob(Time)))] from sender >>>> [Actor[akka://flink/temp/jobmanager_2$4d]] to recipient >>>> [Actor[akka://flink/user/rpc/jobmanager_2#-1135810317]], because the >>>> recipient is unreachable. This can either mean that the recipient has been >>>> terminated or that the remote RpcService is currently not reachable. >>>> >>>> at >>>> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61) >>>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) >>>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) >>>> >>>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) >>>> >>>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) >>>> >>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) >>>> >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) >>>> >>>> at akka.actor.Actor.aroundReceive(Actor.scala:537) >>>> >>>> at akka.actor.Actor.aroundReceive$(Actor.scala:535) >>>> >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) >>>> >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) >>>> >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:547) >>>> >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) >>>> >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:231) >>>> >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:243) >>>> >>>> at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) >>>> >>>> at >>>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown >>>> Source) >>>> >>>> at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) >>>> >>>> at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) >>>> >>>> at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown >>>> Source) >>>> >>>> >>>> End of exception on server side>] >>>> >>>> at >>>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536) >>>> ~[flink-dist-1.17.1.jar:1.17.1] >>>> >>>> at >>>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516) >>>> ~[flink-dist-1.17.1.jar:1.17.1] >>>> >>>> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown >>>> Source) ~[?:?] >>>> >>>> at java.util.concurrent.CompletableFuture$Completion.run(Unknown >>>> Source) ~[?:?] >>>> >>>> ... 3 more >>>> >>>> command terminated with exit code 137 >>>> >>>> ``` >>>> >>>> >>>> And then the `flinkdeployment` goes wrong and cannot recover itself >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>