Hi,

I've been using HDFS with Flink for checkpoint and savepoint storage which
works perfectly fine. Now I have another use case where I want to read and
write to HDFS from the application code as well. For this, I'm using the
"pyarrow" library which is already installed with PyFlink as a dependency.

According to the pyarrow documentation [1], HADOOP_HOME and CLASSPATH
environment variables are mandatory. As per the Flink documentation [2],
HADOOP_CLASSPATH must be set.

I'm using Flink Kubernetes Operator to deploy my application and the issue
arises only when I'm using the native mode. When I deploy the application
with all the variables above, the JobManager starts up but the TaskManager
fails to start with the following error from Kubernetes:

MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap
"hadoop-config-flink-job" not found

It also seems to set a HADOOP_CONF_DIR environment variable on the
TaskManager with the value "/opt/hadoop/conf" which doesn't exist as my
hadoop installation is elsewhere. If I run the job on standalone mode,
everything seems to work fine as the TaskManager doesn't look for a
"hadoop-config-volume" to mount. Here's the YAML file for reference:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-job
  namespace: flink
spec:
  image: <IMAGE_NAME>
  imagePullPolicy: Always
  flinkVersion: v1_17
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
    state.savepoints.dir:
hdfs://hdfs-namenode-0.hdfs-namenodes.hdfs:8020/flink-data/savepoints
    state.checkpoints.dir:
hdfs://hdfs-namenode-0.hdfs-namenodes.hdfs:8020/flink-data/checkpoints
    high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir:
hdfs://hdfs-namenode-0.hdfs-namenodes.hdfs:8020/flink-data/ha
    execution.checkpointing.interval: "3s"
    execution.checkpointing.unaligned: "true"
    execution.checkpointing.timeout: "30m"
  serviceAccount: flink
  podTemplate:
    spec:
      imagePullSecrets:
        - name: <IMAGE_PULL_SECRET>
      containers:
        - name: flink-main-container
          env:
            - name: HADOOP_HOME
              value: /hadoop-3.2.1
            - name: CLASSPATH
              value: <redacted>
            - name: HADOOP_CLASSPATH
              value: <redacted>
  jobManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  taskManager:
    resource:
      memory: "1024m"
      cpu: 0.5
  job:
    jarURI: local:///opt/flink/opt/flink-python_2.12-1.17.0.jar
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["python", "-pym", "flink_job"]
    parallelism: 1
    upgradeMode: savepoint
    state: running
    savepointTriggerNonce: 0

Please note that I've also tried by installing hadoop to "/opt/" and
symlinking the "conf" directory as expected by HADOOP_CONF_DIR. This also
didn't work.

As mentioned before, if I add "mode: standalone", this job runs without any
problem. But since the autoscaling feature only works on the native mode, I
need to get it working there. Any help is appreciated.

Versions:
Flink - 1.17.1
PyFlink - 1.17.1
Flink Kubernetes Operator - 1.5.0


- [1]
https://arrow.apache.org/docs/python/filesystems.html#hadoop-distributed-file-system-hdfs
- [2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/configuration/advanced/#hadoop-dependencies


Thanks,
Sunny

-- 









SELISE Group
Zürich: The Circle 37, 8058 Zürich-Airport, 
Switzerland
Munich: Tal 44, 80331 München, Germany
Dubai: Building 3, 3rd 
Floor, Dubai Design District, Dubai, United Arab Emirates
Dhaka: Midas 
Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
Thimphu: Bhutan 
Innovation Tech Center, Babesa, P.O. Box 633, Thimphu, Bhutan

Visit us: 
www.selisegroup.com <http://www.selisegroup.com>




-- 




*Important Note: This e-mail and any attachment are confidential and 
may contain trade secrets and may well also be legally privileged or 
otherwise protected from disclosure. If you have received it in error, you 
are on notice of its status. Please notify us immediately by reply e-mail 
and then delete this e-mail and any attachment from your system. If you are 
not the intended recipient please understand that you must not copy this 
e-mail or any attachment or disclose the contents to any other person. 
Thank you for your cooperation.*

Reply via email to