Hi Xiaolong, I think you may need to check the error log in the flink cluster to find out the root cause.
Best, Shammon FY On Tue, Jul 4, 2023 at 3:38 PM Xiaolong Wang <xiaolong.w...@smartnews.com> wrote: > The flink web ui is fine until I run the Hive query. After that the flink > deployment is down and the web UI is not accessible. > > On Tue, Jul 4, 2023 at 9:13 AM Shammon FY <zjur...@gmail.com> wrote: > >> Hi Xiaolong, >> >> From the exception it seems that the flink session cluster is not >> running properly. Can you visit the flink web ui and everything is ok? >> >> Best, >> Shammon FY >> >> On Mon, Jul 3, 2023 at 2:43 PM Xiaolong Wang <xiaolong.w...@smartnews.com> >> wrote: >> >>> Hi, >>> I've tested the Flink SQL-gateway to run some simple Hive queries but >>> met some exceptions. >>> >>> >>> Environment Description: >>> Run on : Kubernetes >>> Deployment Mode: Session Mode (created by a flink-kubernetes-operator) >>> Steps to run: >>> 1. Apply a `flinkdeployment` of flink session cluster to flink operator >>> ``` >>> apiVersion: flink.apache.org/v1beta1 >>> kind: FlinkDeployment >>> metadata: >>> name: flink-session-cluster-example >>> namespace: xxx >>> spec: >>> image: xxx/flink:1.17-sql-gateway-dev >>> flinkVersion: v1_17 >>> flinkConfiguration: >>> taskmanager.numberOfTaskSlots: "2" >>> pipeline.max-parallelism: "1000" >>> state.backend.type: rocksdb >>> state.backend.incremental: "true" >>> state.checkpoints.dir: xxx >>> execution.checkpointing.interval: 1m >>> execution.checkpointing.timeout: 30m >>> high-availability: >>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory >>> high-availability.storageDir: xxx >>> akka.framesize: 20971520b >>> execution.checkpointing.externalized-checkpoint-retention: >>> RETAIN_ON_CANCELLATION >>> taskmanager.memory.managed.fraction: "0.2" >>> kubernetes.hadoop.conf.config-map.name: xxx >>> serviceAccount: default >>> podTemplate: >>> apiVersion: v1 >>> kind: Pod >>> metadata: >>> name: pod-template >>> spec: >>> serviceAccount: default >>> jobManager: >>> resource: >>> memory: "2048m" >>> cpu: 1 >>> taskManager: >>> resource: >>> memory: "4096m" >>> cpu: 1 >>> ``` >>> This image has been built with a `hadoop dependency` , an existing >>> `hadoop configmap`. >>> >>> 2. Login to the job-manager pod and run the followings >>> `./bin/sql-gateway.sh start-foreground >>> -Dsql-gateway.endpoint.type=hiveserver2 >>> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/opt/flink/hive-conf` >>> >>> 3. Start a beeline and connect to the SQL gateway then run a simple Hive >>> query >>> `select count(1) from simple_demo_output where dt = '2021-08-14';` >>> >>> 4.The SQL gateway goes wrong with the following logs: >>> ``` >>> >>> 2023-07-03 06:27:11,078 INFO >>> org.apache.flink.client.program.rest.RestClusterClient >>> [] - Submitting job 'collect' (4c99c40392cb935d3df94891655d2ce5). >>> >>> 2023-07-03 06:27:15,092 INFO >>> org.apache.flink.client.program.rest.RestClusterClient >>> [] - Successfully submitted job 'collect' >>> (4c99c40392cb935d3df94891655d2ce5) to ' >>> http://flink-session-cluster-example-rest.realtime-streaming:8081'. >>> >>> 2023-07-03 06:27:15,879 ERROR >>> org.apache.flink.table.gateway.service.operation.OperationManager [] - >>> Failed to execute the operation 7613f663-8641-428c-b3d2-ec77a12fa6ee. >>> >>> org.apache.flink.table.api.TableException: Failed to execute sql >>> >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974) >>> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422) >>> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431) >>> ~[flink-sql-gateway-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195) >>> ~[flink-sql-gateway-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) >>> ~[flink-sql-gateway-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) >>> ~[flink-sql-gateway-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) >>> ~[flink-sql-gateway-1.17.1.jar:1.17.1] >>> >>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >>> [?:?] >>> >>> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?] >>> >>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >>> [?:?] >>> >>> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?] >>> >>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) >>> [?:?] >>> >>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >>> [?:?] >>> >>> at java.lang.Thread.run(Unknown Source) [?:?] >>> >>> Caused by: org.apache.flink.util.FlinkException: Failed to execute job >>> 'collect'. >>> >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212) >>> ~[flink-dist-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95) >>> ~[flink-table-planner_2.12-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:955) >>> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1] >>> >>> ... 13 more >>> >>> Caused by: java.lang.RuntimeException: Error while waiting for job to be >>> initialized >>> >>> at >>> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:151) >>> ~[flink-dist-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:87) >>> ~[flink-dist-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) >>> ~[flink-dist-1.17.1.jar:1.17.1] >>> >>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown >>> Source) ~[?:?] >>> >>> at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) >>> ~[?:?] >>> >>> ... 1 more >>> >>> Caused by: java.util.concurrent.ExecutionException: >>> org.apache.flink.runtime.rest.util.RestClientException: [Internal server >>> error., <Exception on server side: >>> >>> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: >>> Could not send message >>> [LocalFencedMessage(863401565fbee9bb5587a17d5279473a, >>> LocalRpcInvocation(JobMasterGateway.requestJob(Time)))] from sender >>> [Actor[akka://flink/temp/jobmanager_2$4d]] to recipient >>> [Actor[akka://flink/user/rpc/jobmanager_2#-1135810317]], because the >>> recipient is unreachable. This can either mean that the recipient has been >>> terminated or that the remote RpcService is currently not reachable. >>> >>> at >>> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61) >>> >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) >>> >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) >>> >>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) >>> >>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) >>> >>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) >>> >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) >>> >>> at akka.actor.Actor.aroundReceive(Actor.scala:537) >>> >>> at akka.actor.Actor.aroundReceive$(Actor.scala:535) >>> >>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) >>> >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) >>> >>> at akka.actor.ActorCell.invoke(ActorCell.scala:547) >>> >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) >>> >>> at akka.dispatch.Mailbox.run(Mailbox.scala:231) >>> >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:243) >>> >>> at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) >>> >>> at >>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown >>> Source) >>> >>> at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) >>> >>> at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) >>> >>> at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown >>> Source) >>> >>> >>> End of exception on server side>] >>> >>> at java.util.concurrent.CompletableFuture.reportGet(Unknown Source) >>> ~[?:?] >>> >>> at java.util.concurrent.CompletableFuture.get(Unknown Source) ~[?:?] >>> >>> at >>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$null$0(AbstractSessionClusterExecutor.java:88) >>> ~[flink-dist-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:135) >>> ~[flink-dist-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:87) >>> ~[flink-dist-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73) >>> ~[flink-dist-1.17.1.jar:1.17.1] >>> >>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown >>> Source) ~[?:?] >>> >>> at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) >>> ~[?:?] >>> >>> ... 1 more >>> >>> Caused by: org.apache.flink.runtime.rest.util.RestClientException: >>> [Internal server error., <Exception on server side: >>> >>> org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: >>> Could not send message >>> [LocalFencedMessage(863401565fbee9bb5587a17d5279473a, >>> LocalRpcInvocation(JobMasterGateway.requestJob(Time)))] from sender >>> [Actor[akka://flink/temp/jobmanager_2$4d]] to recipient >>> [Actor[akka://flink/user/rpc/jobmanager_2#-1135810317]], because the >>> recipient is unreachable. This can either mean that the recipient has been >>> terminated or that the remote RpcService is currently not reachable. >>> >>> at >>> org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61) >>> >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) >>> >>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) >>> >>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) >>> >>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) >>> >>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) >>> >>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) >>> >>> at akka.actor.Actor.aroundReceive(Actor.scala:537) >>> >>> at akka.actor.Actor.aroundReceive$(Actor.scala:535) >>> >>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) >>> >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) >>> >>> at akka.actor.ActorCell.invoke(ActorCell.scala:547) >>> >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) >>> >>> at akka.dispatch.Mailbox.run(Mailbox.scala:231) >>> >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:243) >>> >>> at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) >>> >>> at >>> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown >>> Source) >>> >>> at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) >>> >>> at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) >>> >>> at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown >>> Source) >>> >>> >>> End of exception on server side>] >>> >>> at >>> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536) >>> ~[flink-dist-1.17.1.jar:1.17.1] >>> >>> at >>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516) >>> ~[flink-dist-1.17.1.jar:1.17.1] >>> >>> at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown >>> Source) ~[?:?] >>> >>> at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) >>> ~[?:?] >>> >>> ... 3 more >>> >>> command terminated with exit code 137 >>> >>> ``` >>> >>> >>> And then the `flinkdeployment` goes wrong and cannot recover itself >>> >>> >>> >>> >>> >>> >>> >>>