Hello Rob. This workaround works indeed !
Cdt. [Logo Orange]<http://www.orange.com/> Gwenael Le Barzic De : Robert Young <robertyoun...@gmail.com> Envoyé : mardi 18 juin 2024 03:54 À : LE BARZIC Gwenael DTSI/SI <gwenael.lebar...@orange.com> Cc : user@flink.apache.org Objet : Re: Problem reading a CSV file with pyflink datastream in k8s with Flink operator CAUTION : This email originated outside the company. Do not click on any links or open attachments unless you are expecting them from the sender. ATTENTION : Cet e-mail provient de l'extérieur de l'entreprise. Ne cliquez pas sur les liens ou n'ouvrez pas les pièces jointes à moins de connaitre l'expéditeur. Hi Gwenael, From the logs I thought it was a JVM module opens/exports issue, but I found it had a similar issue using a java8 base image too. I think the issue is it's not permitted for PythonCsvUtils to call the package-private constructor of CsvReaderFormat across class loaders. One workaround I found is to add a `RUN cp /opt/flink/opt/flink-python* /opt/flink/lib/` to the Dockerfile, so that the flink-python-1.18.1,jar is present in both /opt and /lib. Then when Flink tries to classload org.apache.flink.formats.csv.PythonCsvUtils it will be available to the app classloader. Thanks Rob Young On Mon, Jun 17, 2024 at 11:53 PM <gwenael.lebar...@orange.com<mailto:gwenael.lebar...@orange.com>> wrote: Hello everyone. Does someone know how to solve this please ? Cdt. [Logo Orange]<http://www.orange.com/> Gwenael Le Barzic Ingénieur technique techno BigData Orange/OF/DTSI/SI/DATA-IA/SOLID/CXP Mobile : +33 6 48 70 85 75 <https://monsi.sso.francetelecom.fr/index.asp?target=http%3A%2F%2Fclicvoice.sso.francetelecom.fr%2FClicvoiceV2%2FToolBar.do%3Faction%3Ddefault%26rootservice%3DSIGNATURE%26to%3D+33%206%2048%2070%2085%2075> gwenael.lebar...@orange.com<mailto:gwenael.lebar...@orange.com> Nouveau lien vers le Portail de suivi des Tickets du CXP<https://portail.agir.orange.com/servicedesk/customer/portal/35> Orange Restricted De : LE BARZIC Gwenael DTSI/SI Envoyé : vendredi 14 juin 2024 22:02 À : user@flink.apache.org<mailto:user@flink.apache.org> Objet : Problem reading a CSV file with pyflink datastream in k8s with Flink operator Hello everyone. I get the following error when trying to read a CSV file with pyflink datastream in a k8s environment using the flink operator. ### File "/opt/myworkdir/myscript.py", line 30, in <module> run_flink_job(myfile) File "/opt/myworkdir/myscript.py", line 21, in run_flink_job csvshem = CsvReaderFormat.for_schema(file_csv_schema) File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/formats/csv.py", line 322, in for_schema items = list(charFrequency[char].items()) File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__ File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat. : java.lang.IllegalAccessError: class org.apache.flink.formats.csv.PythonCsvUtils tried to access method 'void org.apache.flink.formats.csv.CsvReaderFormat.<init>(org.apache.flink.util.function.SerializableSupplier, org.apache.flink.util.function.SerializableFunction, java.lang.Class, org.apache.flink.formats.common.Converter, org.apache.flink.api.common.typeinfo.TypeInformation, boolean)' (org.apache.flink.formats.csv.PythonCsvUtils is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @5d9b7a8a; org.apache.flink.formats.csv.CsvReaderFormat is in unnamed module of loader 'app') at org.apache.flink.formats.csv.PythonCsvUtils.createCsvReaderFormat(PythonCsvUtils.java:48) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Unknown Source) ### Here is my dockerfile : ### FROM flink:1.18.1 RUN apt-get update -y && \ apt-get install -y python3 python3-pip python3-dev && rm -rf /var/lib/apt/lists/* RUN ln -s /usr/bin/python3 /usr/bin/python RUN mkdir -p /opt/myworkdir WORKDIR /opt/myworkdir RUN alias python=python3 COPY requirements.txt . RUN pip3 install --no-cache-dir -r requirements.txt COPY src . RUN chown -R flink:flink /opt/myworkdir RUN chmod -R 755 /opt/myworkdir ### Here is my flinkdeployment custom resource : ### apiVersion: flink.apache.org/v1beta1<http://flink.apache.org/v1beta1> kind: FlinkDeployment metadata: finalizers: - flinkdeployments.flink.apache.org/finalizer<http://flinkdeployments.flink.apache.org/finalizer> name: myscript namespace: flink spec: image: myscript:0.1 flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints state.savepoints.dir: file:///checkpoints/flink/savepoints job.autoscaler.enabled: "true" job.autoscaler.stabilization.interval: 1m job.autoscaler.metrics.window: 5m job.autoscaler.target.utilization: "0.6" job.autoscaler.target.utilization.boundary: "0.2" job.autoscaler.restart.time: 2m job.autoscaler.catch-up.duration: 5m pipeline.max-parallelism: "720" serviceAccount: flink jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/myworkdir/myscript.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/bin/python", "-py", "/opt/myworkdir/myscript.py"] parallelism: 1 ### Here is myscript.py : ### import os from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat from pyflink.datastream.connectors.file_system import FileSource from pyflink.common import Types, Row, WatermarkStrategy import time def run_flink_job(myfile): env = StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.STREAMING) file_csv_schema = CsvSchema.builder() \ .add_string_column('column_1') \ .add_string_column('column_2') \ .set_column_separator(';') \ .set_skip_first_data_row(True) \ .build() csvshem = CsvReaderFormat.for_schema(file_csv_schema) f_source = FileSource.for_record_stream_format(csvshem, myfile).build() ds_f = env.from_source(f_source, WatermarkStrategy.no_watermarks(), 'csv_source').map(lambda x: x) ds_f.print() env.execute() if __name__ == '__main__': myfile = '%s/src/input/dummy.csv' % os.getcwd() run_flink_job(myfile) ### I tried another flink deployment reading a kafka topic, and it works fine. This problem seems to occure only if I try to read a CSV file. Here is the CSV file : ### column_1;column_2 toto;10 tyty;20 tata;30 tutu;40 tete;50 ### I installed the flink kubernetes operator as described in the official doc and I’m using minikube on my workstation. Thank you in advance for your help ! Cdt. Gwenael Le Barzic Orange Restricted Orange Restricted ____________________________________________________________________________________________________________ Ce message et ses pieces jointes peuvent contenir des informations confidentielles ou privilegiees et ne doivent donc pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce message par erreur, veuillez le signaler a l'expediteur et le detruire ainsi que les pieces jointes. Les messages electroniques etant susceptibles d'alteration, Orange decline toute responsabilite si ce message a ete altere, deforme ou falsifie. Merci. This message and its attachments may contain confidential or privileged information that may be protected by law; they should not be distributed, used or copied without authorisation. If you have received this email in error, please notify the sender and delete this message and its attachments. As emails may be altered, Orange is not liable for messages that have been modified, changed or falsified. Thank you. ____________________________________________________________________________________________________________ Ce message et ses pieces jointes peuvent contenir des informations confidentielles ou privilegiees et ne doivent donc pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce message par erreur, veuillez le signaler a l'expediteur et le detruire ainsi que les pieces jointes. Les messages electroniques etant susceptibles d'alteration, Orange decline toute responsabilite si ce message a ete altere, deforme ou falsifie. Merci. This message and its attachments may contain confidential or privileged information that may be protected by law; they should not be distributed, used or copied without authorisation. If you have received this email in error, please notify the sender and delete this message and its attachments. As emails may be altered, Orange is not liable for messages that have been modified, changed or falsified. Thank you.