Flink version: 1.15.0
deploy mode: Native k8s application
问题现象:
我以Native
k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s
再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。
kubectl delete deployment flink-bdra-sql-application-job -n
bdra-dev-flink-standalone
kubectl get configMap -n bdra-dev-flink-standalone
NAME
DATA AGE
flink-bdra-sql-application-job-00000000000000000000000000000000-config-map
2 13m
flink-bdra-sql-application-job-cluster-config-map
1 13m
我有以下疑问:
1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink
k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。
2.基于k8s做HA的Flink job id皆为00000000000000000000000000000000。
3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。
重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复)
flink run-application --target kubernetes-application -c CalculateUv
-Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p
-Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20
-Dkubernetes.namespace=bdra-dev-flink-standalone
-Dkubernetes.service-account=bdra-dev-flink-standalone-sa
-Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2
-Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8
-Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m
-Dstate.backend=filesystem
-Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3
-Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
-Dhigh-availability.storageDir=file:///opt/flink/log/recovery
-Ds3.access-key=* -Ds3.secret-key=*
-Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory
-Dmetrics.reporter.influxdb.scheme=http
-Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086
-Dmetrics.reporter.influxdb.db=flink_metrics
-Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80
-Dkubernetes.rest-service.exposed.type=ClusterIP
-Dkubernetes.config.file=kube_config
-Dkubernetes.pod-template-file=pod-template.yaml
local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar
重启后自动从ConfigMap中恢复。
2022-06-10 20:20:52,592 INFO
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] -
Successfully recovered 1 persisted job graphs.
2022-06-10 20:20:52,654 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_1 .
2022-06-10 20:20:53,552 INFO
org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered 0
pods from previous attempts, current attempt id is 1.
2022-06-10 20:20:53,552 INFO
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
Recovered 0 workers from previous attempt.
2022-06-10 20:20:55,352 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Starting DefaultLeaderElectionService with
org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5.
2022-06-10 20:20:55,370 INFO org.apache.flink.client.ClientUtils
[] - Starting program (detached: false)
2022-06-10 20:20:55,394 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for
org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_2 .
2022-06-10 20:20:55,438 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Initializing job
'insert-into_default_catalog.default_database.buy_cnt_per_hour'
(00000000000000000000000000000000).
2022-06-10 20:20:55,477 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using restart back off time strategy
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
backoffTimeMS=1000) for
insert-into_default_catalog.default_database.buy_cnt_per_hour
(00000000000000000000000000000000).
2022-06-10 20:20:55,558 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
Recovering checkpoints from
KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}.
2022-06-10 20:20:55,572 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
Found 1 checkpoints in
KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job-00000000000000000000000000000000-config-map'}.
2022-06-10 20:20:55,572 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
Trying to fetch 1 checkpoints from storage.
2022-06-10 20:20:55,572 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] -
Trying to retrieve checkpoint 64.
2022-06-10 20:20:55,760 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Running initialization on master for job
insert-into_default_catalog.default_database.buy_cnt_per_hour
(00000000000000000000000000000000).
2022-06-10 20:20:55,760 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Successfully ran initialization on master in 0 ms.
2022-06-10 20:20:56,254 INFO
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built
1 new pipelined regions in 2 ms, total 1 pipelined regions currently.
2022-06-10 20:20:56,266 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using job/cluster config to configure application-defined state
backend: org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5151a2cd
2022-06-10 20:20:56,267 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using application-defined state backend:
org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5cfc3f54
2022-06-10 20:20:56,267 INFO org.apache.flink.runtime.state.StateBackendLoader
[] - State backend loader loads the state backend as
HashMapStateBackend
2022-06-10 20:20:56,270 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using job/cluster config to configure application-defined
checkpoint storage:
org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@f3bc6a6
2022-06-10 20:20:57,763 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job
00000000000000000000000000000000 from Checkpoint 64 @ 1654863136682 for
00000000000000000000000000000000 located at
s3p://otsp-flink-lun01/flink-checkpoints/00000000000000000000000000000000/chk-64.
2022-06-10 20:20:57,797 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master
state to restore
2022-06-10 20:20:57,798 INFO
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
[] - Resetting coordinator to checkpoint.
2022-06-10 20:20:57,839 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing
SourceCoordinator for source Source: user_behavior[1].
2022-06-10 20:20:57,840 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source
coordinator for source Source: user_behavior[1] closed.
2022-06-10 20:20:57,847 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring
SplitEnumerator of source Source: user_behavior[1] from checkpoint.
2022-06-10 20:20:57,866 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using failover strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@55d02910
for insert-into_default_catalog.default_database.buy_cnt_per_hour
(00000000000000000000000000000000).
2022-06-10 20:20:57,989 INFO
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Starting DefaultLeaderRetrievalService with
KubernetesLeaderRetrievalDriver{configMapName='flink-bdra-sql-application-job-cluster-config-map'}.
2022-06-10 20:20:57,989 INFO
org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
[] - Starting to watch for
bdra-dev-flink-standalone/flink-bdra-sql-application-job-cluster-config-map,
watching id:93e9b11c-a69c-425a-b35a-e65bc53ea5b1
2022-06-10 20:20:57,990 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Starting execution of job
'insert-into_default_catalog.default_database.buy_cnt_per_hour'
(00000000000000000000000000000000) under job master id
92d47a56896398911c0078e0a2544608.
2022-06-10 20:20:57,996 INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting
split enumerator for source Source: user_behavior[1].
2022-06-10 20:20:57,998 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Starting scheduling with scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2022-06-10 20:20:57,999 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
insert-into_default_catalog.default_database.buy_cnt_per_hour
(00000000000000000000000000000000) switched from state CREATED to RUNNING.
2022-06-10 20:20:58,049 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (1/8)
(1b3b93d9f79b647fcc54d5253974d94f) switched from CREATED to SCHEDULED.
2022-06-10 20:20:58,050 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (2/8)
(ebf049155617e3fc58d449eb9f3b0eb6) switched from CREATED to SCHEDULED.
2022-06-10 20:20:58,050 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
user_behavior[1] -> Calc[2] -> LocalWindowAggregate[3] (3/8)
(118cb05f1cb95e9a569a1d99e5aef29e) switched from CREATED to SCHEDULED.