Hi Enric,

You can try using persistent volume claim on your kubernetes cluster as a 
JobResultStore, instead of using a local path from your underlying host, and 
see if it works.

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-data-pvc
spec:
  resources:
    requests:
      storage: 10Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce


And edit your yaml 
(spec.podTemplate.spec.volumes.persistentVolumeClaim.claimName) to use this PVC:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-reactive-example
spec:
  image: flink:1.17
  flinkVersion: v1_17
  flinkConfiguration:
    scheduler-mode: REACTIVE
    taskmanager.numberOfTaskSlots: "2"
    state.savepoints.dir: file:///flink-data/savepoints
    state.checkpoints.dir: file:///flink-data/checkpoints
    high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///flink-data/ha
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          volumeMounts:
          - mountPath: /flink-data
            name: flink-volume
      volumes:
      - name: flink-volume
        persistentVolumeClaim:
          claimName: flink-data-pvc
  job:
    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
    parallelism: 2
    upgradeMode: savepoint
    state: running
    savepointTriggerNonce: 0
  mode: standalone


Naci

> On 11. Jul 2024, at 05:40, Enric Ott <243816...@qq.com> wrote:
> 
> Hi,Community:
>   I hava encountered a problem when deploy reactive flink scheduler on 
> kubernetes with flink kubernetes operator 1.6.0,the manifest and exception 
> stack info listed as follows.
> Any clues would be appreciated.
> 
> ################################################################################
> #  Licensed to the Apache Software Foundation (ASF) under one
> #  or more contributor license agreements.  See the NOTICE file
> #  distributed with this work for additional information
> #  regarding copyright ownership.  The ASF licenses this file
> #  to you under the Apache License, Version 2.0 (the
> #  "License"); you may not use this file except in compliance
> #  with the License.  You may obtain a copy of the License at
> #
> #      http://www.apache.org/licenses/LICENSE-2.0
> #
> #  Unless required by applicable law or agreed to in writing, software
> #  distributed under the License is distributed on an "AS IS" BASIS,
> #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> #  See the License for the specific language governing permissions and
> # limitations under the License.
> ################################################################################
> 
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: basic-reactive-example
> spec:
>   image: flink:1.17
>   flinkVersion: v1_17
>   flinkConfiguration:
>     scheduler-mode: REACTIVE
>     taskmanager.numberOfTaskSlots: "2"
>     state.savepoints.dir: file:///flink-data/savepoints
>     state.checkpoints.dir: file:///flink-data/checkpoints
>     high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>     high-availability.storageDir: file:///flink-data/ha
>   serviceAccount: flink
>   jobManager:
>     resource:
>       memory: "2048m"
>       cpu: 1
>   taskManager:
>     resource:
>       memory: "2048m"
>       cpu: 1
>   podTemplate:
>     spec:
>       containers:
>         - name: flink-main-container
>           volumeMounts:
>           - mountPath: /flink-data
>             name: flink-volume
>       volumes:
>       - name: flink-volume
>         hostPath:
>           # directory location on host
>           path: /run/desktop/mnt/host/c/Users/24381/Documents/
>           # this field is optional
>           type: DirectoryOrCreate
>   job:
>     jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
>     parallelism: 2
>     upgradeMode: savepoint
>     state: running
>     savepointTriggerNonce: 0
>   mode: standalone
> 
> 
> ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal 
> error occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException: java.lang.IllegalStateException: 
> The base directory of the JobResultStore isn't accessible. No dirty 
> JobResults can be restored.
>       at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> Source) ~[?:?]
>       at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) [?:?]
>       at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> Source) [?:?]
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> [?:?]
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> [?:?]
>       at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.lang.IllegalStateException: The base directory of the 
> JobResultStore isn't accessible. No dirty JobResults can be restored.
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
> ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:199)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       at 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
>  ~[flink-dist-1.17.1.jar:1.17.1]
>       ... 4 more
> 

Reply via email to