Hello Rob. This workaround works indeed !
Hi Gwenael, From the logs I thought it was a JVM module opens/exports issue, but I found it had a similar issue using a java8 base image too. I think the issue is it's not permitted for PythonCsvUtils to call the package-private constructor of CsvReaderFormat across class loaders. One workaround I found is to add a `RUN cp /opt/flink/opt/flink-python* /opt/flink/lib/` to the Dockerfile, so that the flink-python-1.18.1,jar is present in both /opt and /lib. Then when Flink tries to classload org.apache.flink.formats.csv.PythonCsvUtils it will be available to the app classloader. Thanks Rob Young On Mon, Jun 17, 2024 at 11:53 PM: Hello everyone. Does someone know how to solve this please ? Hello everyone. I get the following error when trying to read a CSV file with pyflink datastream in a k8s environment using the flink operator. ### File "/opt/myworkdir/myscript.py", line 30, in <module> run_flink_job(myfile) File "/opt/myworkdir/myscript.py", line 21, in run_flink_job csvshem = CsvReaderFormat.for_schema(file_csv_schema) File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", line 322, in for_schema items = list(charFrequency[char].items()) File "/opt/flink/opt/python/py4j-", line 1322, in __call__ File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco File "/opt/flink/opt/python/py4j-", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat. : java.lang.IllegalAccessError: class org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void org.apache.flink.formats.csv.CsvReaderFormat.<init>(org.apache.flink.util.function.SerializableSupplier, org.apache.flink.util.function.SerializableFunction, java.lang.Class, org.apache.flink.formats.common.Converter, org.apache.flink.api.common.typeinfo.TypeInformation, boolean)' (org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a; org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader 'app') at org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Unknown Source) ### Here is my dockerfile : ### FROM flink:1.18.1 RUN apt-get update -y && \ apt-get install -y python3 python3-pip python3-dev && rm -rf /var/lib/apt/lists/* RUN ln -s /usr/bin/python3 /usr/bin/python RUN mkdir -p /opt/myworkdir WORKDIR /opt/myworkdir RUN alias python=python3 COPY requirements.txt . RUN pip3 install --no-cache-dir -r requirements.txt COPY src . RUN chown -R flink:flink /opt/myworkdir RUN chmod -R 755 /opt/myworkdir ### Here is my flinkdeployment custom resource : ### apiVersion: flink.apache.org/v1beta1<http://flink.apache.org/v1beta1> kind: FlinkDeployment metadata: finalizers: - flinkdeployments.flink.apache.org/finalizer<http://flinkdeployments.flink.apache.org/finalizer> name: myscript namespace: flink spec: image: myscript:0.1 flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints state.savepoints.dir: file:///checkpoints/flink/savepoints job.autoscaler.enabled: "true" job.autoscaler.stabilization.interval: 1m job.autoscaler.metrics.window: 5m job.autoscaler.target.utilization: "0.6" job.autoscaler.target.utilization.boundary: "0.2" job.autoscaler.restart.time: 2m job.autoscaler.catch-up.duration: 5m pipeline.max-parallelism: "720" serviceAccount: flink jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/myworkdir/myscript.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/bin/python", "-py", "/opt/myworkdir/myscript.py"] parallelism: 1 ### Here is myscript.py : ### import os from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat from pyflink.datastream.connectors.file_system import FileSource from pyflink.common import Types, Row, WatermarkStrategy import time def run_flink_job(myfile): env = StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.STREAMING) file_csv_schema = CsvSchema.builder() \ .add_string_column('column_1') \ .add_string_column('column_2') \ .set_column_separator(';') \ .set_skip_first_data_row(True) \ .build() csvshem = CsvReaderFormat.for_schema(file_csv_schema) f_source = FileSource.for_record_stream_format(csvshem, myfile).build() ds_f = env.from_source(f_source, WatermarkStrategy.no_watermarks(), 'csv_source').map(lambda x: x) ds_f.print() env.execute() if __name__ == '__main__': myfile = '%s/src/input/dummy.csv' % os.getcwd() run_flink_job(myfile) ### I tried another flink deployment reading a kafka topic, and it works fine. This problem seems to occure only if I try to read a CSV file. Here is the CSV file : ### column_1;column_2 toto;10 tyty;20 tata;30 tutu;40 tete;50 ### I installed the flink kubernetes operator as described in the official doc and I'm using minikube on my workstation. Thank you in advance for your help ! 