Hi Dan, I think Arvid and Dawid are right, as a workaround, you can try making S3Filesystem works in the client. But for a long term solution, we can fix it.
I created https://issues.apache.org/jira/browse/FLINK-19228 for tracking this. Best, Jingsong On Mon, Sep 14, 2020 at 3:57 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Dan, > > As far as I checked in the code, the FileSystemSink will try to create > staging directories from the client. I think it might be problematic, as > your case shows. We might need to revisit that part. I am cc'ing Jingsong > who worked on the FileSystemSink. > > As a workaround you might try putting the s3 plugin on the CLI classpath > (not sure if plugins work for the CLI through the /plugins directory). > > Best, > > Dawid > On 10/09/2020 22:13, Dan Hill wrote: > > This is running on my local minikube and is trying to hit minio. > > On Thu, Sep 10, 2020 at 1:10 PM Dan Hill <quietgol...@gmail.com> wrote: > >> I'm using this Helm chart >> <https://github.com/riskfocus/helm-charts-public/tree/master/flink>. I >> start the job by building an image with the job jar and using kubectl apply >> to do a flink run with the jar. >> >> The log4j.properties on jobmanager and taskmanager have debug level set >> and are pretty embedded into the Helm chart. My log4j-cli.properties is >> hacked on the CLI side. >> >> I thought I just needed the s3 plugins in the jobmanager and >> taskmanager. Do I need to have a similar plugin structure from the image >> where I run 'flink run'? >> >> >> On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <quietgol...@gmail.com> wrote: >> >>> Copying more of the log >>> >>> 2020-09-10 19:50:17,712 INFO org.apache.flink.client.cli.CliFrontend >>> [] - >>> -------------------------------------------------------------------------------- >>> >>> 2020-09-10 19:50:17,718 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Starting Command Line Client (Version: >>> 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00) >>> >>> 2020-09-10 19:50:17,719 INFO org.apache.flink.client.cli.CliFrontend >>> [] - OS current user: root >>> >>> 2020-09-10 19:50:17,719 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Current Hadoop/Kerberos user: <no hadoop >>> dependency found> >>> >>> 2020-09-10 19:50:17,719 INFO org.apache.flink.client.cli.CliFrontend >>> [] - JVM: OpenJDK 64-Bit Server VM - Oracle >>> Corporation - 1.8/25.265-b01 >>> >>> 2020-09-10 19:50:17,719 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Maximum heap size: 2167 MiBytes >>> >>> tail: log/flink--client-flink-jobmanager-0.log: file truncated >>> >>> 2020-09-10 19:50:17,720 INFO org.apache.flink.client.cli.CliFrontend >>> [] - JAVA_HOME: /usr/local/openjdk-8 >>> >>> 2020-09-10 19:50:17,720 INFO org.apache.flink.client.cli.CliFrontend >>> [] - No Hadoop Dependency available >>> >>> 2020-09-10 19:50:17,720 INFO org.apache.flink.client.cli.CliFrontend >>> [] - JVM Options: >>> >>> 2020-09-10 19:50:17,720 INFO org.apache.flink.client.cli.CliFrontend >>> [] - >>> -Djava.security.properties=/opt/flink/conf/security.properties >>> >>> 2020-09-10 19:50:17,720 INFO org.apache.flink.client.cli.CliFrontend >>> [] - >>> -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log >>> >>> 2020-09-10 19:50:17,720 INFO org.apache.flink.client.cli.CliFrontend >>> [] - -Dlog4j.configuration= >>> file:/opt/flink/conf/log4j-cli.properties >>> >>> 2020-09-10 19:50:17,721 INFO org.apache.flink.client.cli.CliFrontend >>> [] - -Dlog4j.configurationFile= >>> file:/opt/flink/conf/log4j-cli.properties >>> >>> 2020-09-10 19:50:17,721 INFO org.apache.flink.client.cli.CliFrontend >>> [] - -Dlogback.configurationFile= >>> file:/opt/flink/conf/logback.xml >>> >>> 2020-09-10 19:50:17,721 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Program Arguments: >>> >>> 2020-09-10 19:50:17,721 INFO org.apache.flink.client.cli.CliFrontend >>> [] - list >>> >>> 2020-09-10 19:50:17,721 INFO org.apache.flink.client.cli.CliFrontend >>> [] - --jobmanager >>> >>> 2020-09-10 19:50:17,721 INFO org.apache.flink.client.cli.CliFrontend >>> [] - localhost:8081 >>> >>> 2020-09-10 19:50:17,721 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Classpath: >>> /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar::: >>> >>> 2020-09-10 19:50:17,722 INFO org.apache.flink.client.cli.CliFrontend >>> [] - >>> -------------------------------------------------------------------------------- >>> >>> 2020-09-10 19:50:17,731 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> taskmanager.numberOfTaskSlots, 2 >>> >>> 2020-09-10 19:50:17,732 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: blob.server.port, 6124 >>> >>> 2020-09-10 19:50:17,732 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: taskmanager.rpc.port, >>> 6122 >>> >>> 2020-09-10 19:50:17,732 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: jobmanager.heap.size, 1g >>> >>> 2020-09-10 19:50:17,732 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> taskmanager.memory.process.size, 1g >>> >>> 2020-09-10 19:50:17,733 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: state.backend, rocksdb >>> >>> 2020-09-10 19:50:17,733 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: state.checkpoints.dir, >>> file:///flink_state/checkpoints >>> >>> 2020-09-10 19:50:17,733 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: state.savepoints.dir, >>> file:///flink_state/savepoints >>> >>> 2020-09-10 19:50:17,733 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: state.backend.async, true >>> >>> 2020-09-10 19:50:17,733 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> state.backend.fs.memory-threshold, 1024 >>> >>> 2020-09-10 19:50:17,734 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> state.backend.fs.write-buffer-size, 4096 >>> >>> 2020-09-10 19:50:17,734 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> state.backend.incremental, true >>> >>> 2020-09-10 19:50:17,734 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> state.backend.local-recovery, true >>> >>> 2020-09-10 19:50:17,734 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> state.checkpoints.num-retained, 1 >>> >>> 2020-09-10 19:50:17,734 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> taskmanager.state.local.root-dirs, file:///flink_state/local-recovery >>> >>> 2020-09-10 19:50:17,735 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> state.backend.rocksdb.checkpoint.transfer.thread.num, 1 >>> >>> 2020-09-10 19:50:17,735 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> state.backend.rocksdb.localdir, /flink_state/rocksdb >>> >>> 2020-09-10 19:50:17,735 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> state.backend.rocksdb.options-factory, >>> org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory >>> >>> 2020-09-10 19:50:17,735 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> state.backend.rocksdb.predefined-options, DEFAULT >>> >>> 2020-09-10 19:50:17,735 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> state.backend.rocksdb.timer-service.factory, HEAP >>> >>> 2020-09-10 19:50:17,735 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> state.backend.rocksdb.ttl.compaction.filter.enabled, false >>> >>> 2020-09-10 19:50:17,736 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: jobmanager.rpc.address, >>> flink-jobmanager >>> >>> 2020-09-10 19:50:17,736 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: jobmanager.rpc.port, 6123 >>> >>> 2020-09-10 19:50:17,736 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> taskmanager.memory.jvm-metaspace.size, 256mb >>> >>> 2020-09-10 19:50:17,736 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: s3a.endpoint, >>> http://minio:9000 >>> >>> 2020-09-10 19:50:17,736 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: s3a.path.style.access, >>> true >>> >>> 2020-09-10 19:50:17,736 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: s3a.access-key, >>> YOURACCESSKEY >>> >>> 2020-09-10 19:50:17,737 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: s3a.secret-key, ****** >>> >>> 2020-09-10 19:50:17,737 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> [] - Loading configuration property: >>> s3a.aws.credentials.provider, >>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider >>> >>> 2020-09-10 19:50:17,802 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Loading FallbackYarnSessionCli >>> >>> 2020-09-10 19:50:17,929 INFO org.apache.flink.core.fs.FileSystem >>> [] - Hadoop is not in the classpath/dependencies. >>> The extended set of supported File Systems via Hadoop is not available. >>> >>> 2020-09-10 19:50:18,102 INFO >>> org.apache.flink.runtime.security.modules.HadoopModuleFactory >>> [] - Cannot create Hadoop Security Module because Hadoop cannot be found in >>> the Classpath. >>> >>> 2020-09-10 19:50:18,126 INFO >>> org.apache.flink.runtime.security.modules.JaasModule >>> [] - Jaas file will be created as >>> /tmp/jaas-1506212733867615019.conf. >>> >>> 2020-09-10 19:50:18,161 INFO >>> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory >>> [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in >>> the Classpath. >>> >>> 2020-09-10 19:50:18,163 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Running 'list' command. >>> >>> 2020-09-10 19:50:18,226 INFO >>> org.apache.flink.client.deployment.DefaultClusterClientServiceLoader >>> [] - Could not load factory due to missing dependencies. >>> >>> 2020-09-10 19:50:19,107 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Waiting for response... >>> >>> 2020-09-10 19:50:19,414 INFO org.apache.flink.client.cli.CliFrontend >>> [] - Successfully retrieved list of jobs >>> >>> >>> On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise <ar...@ververica.com> wrote: >>> >>>> Hi Dan, >>>> >>>> somehow enabling debug statements did not work. >>>> >>>> However, the logs helps to narrow down the issue. The exception occurs >>>> neither on jobmanager nor on taskmanager. It occurs wherever you execute >>>> the command line interface. >>>> >>>> How do you execute the job? Do you start it from your machine? Can you >>>> try out to also add the respective s3 plugin there? >>>> >>>> Best, >>>> >>>> Arvid >>>> >>>> On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <quietgol...@gmail.com> wrote: >>>> >>>>> I changed the levels to DEBUG. I don't see useful data in the logs. >>>>> >>>>> >>>>> https://drive.google.com/file/d/1ua1zsr3BInY_8xdsWwA__F0uloAqy-vG/view?usp=sharing >>>>> >>>>> On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <ar...@ververica.com> >>>>> wrote: >>>>> >>>>>> Could you try 1) or 2) and enable debug logging* and share the log >>>>>> with us? >>>>>> >>>>>> *Usually by adjusting FLINK_HOME/conf/log4j.properties. >>>>>> >>>>>> On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <quietgol...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Ah, sorry, it's a copy/paste issue with this email. I've tried >>>>>>> both: >>>>>>> 1) using s3a uri with flink-s3-fs-hadoop jar >>>>>>> in /opt/flink/plugins/s3-fs-hadoop. >>>>>>> 2) using s3p uri with flink-s3-fs-presto jar >>>>>>> in /opt/flink/plugins/s3-fs-presto. >>>>>>> 3) loading both 1 and 2 >>>>>>> 4) trying s3 uri. >>>>>>> >>>>>>> When doing 1) >>>>>>> >>>>>>> Caused by: >>>>>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not >>>>>>> find a file system implementation for scheme 's3a'. The scheme is >>>>>>> directly >>>>>>> supported by Flink through the following plugin: flink-s3-fs-hadoop. >>>>>>> Please >>>>>>> ensure that each plugin resides within its own subfolder within the >>>>>>> plugins >>>>>>> directory. See >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html >>>>>>> for more information. If you want to use a Hadoop file system for that >>>>>>> scheme, please add the scheme to the configuration >>>>>>> fs.allowed-fallback-filesystems. For a full list of supported file >>>>>>> systems, >>>>>>> please see >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/ >>>>>>> . >>>>>>> >>>>>>> When doing 2) >>>>>>> >>>>>>> Caused by: >>>>>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not >>>>>>> find a file system implementation for scheme 's3p'. The scheme is >>>>>>> directly >>>>>>> supported by Flink through the following plugin: flink-s3-fs-presto. >>>>>>> Please >>>>>>> ensure that each plugin resides within its own subfolder within the >>>>>>> plugins >>>>>>> directory. See >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html >>>>>>> for more information. If you want to use a Hadoop file system for that >>>>>>> scheme, please add the scheme to the configuration >>>>>>> fs.allowed-fallback-filesystems. For a full list of supported file >>>>>>> systems, >>>>>>> please see >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/ >>>>>>> . >>>>>>> >>>>>>> etc >>>>>>> >>>>>>> On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <ar...@ververica.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Dan, >>>>>>>> >>>>>>>> s3p is only provided by flink-s3-fs-presto plugin. The plugin you >>>>>>>> used provides s3a. >>>>>>>> (and both provide s3, but it's good to use the more specific >>>>>>>> prefix). >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Arvid >>>>>>>> >>>>>>>> On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <quietgol...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> *Background* >>>>>>>>> I'm converting some prototype Flink v1.11.1 code that uses >>>>>>>>> DataSet/DataTable APIs to use the Table API. >>>>>>>>> >>>>>>>>> *Problem* >>>>>>>>> When switching to using the Table API, my s3 plugins stopped >>>>>>>>> working. I don't know why. I've added the required maven table >>>>>>>>> dependencies to the job. >>>>>>>>> >>>>>>>>> I've tried us moving both the presto and/or the hadoop s3 jars to >>>>>>>>> plugin subfolders. No luck. >>>>>>>>> >>>>>>>>> Any ideas what is wrong? I'm guessing I'm missing something >>>>>>>>> simple. >>>>>>>>> >>>>>>>>> >>>>>>>>> *Error* >>>>>>>>> >>>>>>>>> Caused by: >>>>>>>>> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could >>>>>>>>> not >>>>>>>>> find a file system implementation for scheme 's3p'. The scheme is >>>>>>>>> directly >>>>>>>>> supported by Flink through the following plugin: flink-s3-fs-presto. >>>>>>>>> Please >>>>>>>>> ensure that each plugin resides within its own subfolder within the >>>>>>>>> plugins >>>>>>>>> directory. See >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html >>>>>>>>> for more information. If you want to use a Hadoop file system for that >>>>>>>>> scheme, please add the scheme to the configuration >>>>>>>>> fs.allowed-fallback-filesystems. For a full list of supported file >>>>>>>>> systems, >>>>>>>>> please see >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/ >>>>>>>>> . >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473) >>>>>>>>> >>>>>>>>> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389) >>>>>>>>> >>>>>>>>> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232) >>>>>>>>> >>>>>>>>> ... 35 more >>>>>>>>> >>>>>>>>> *ls of plugins directory (same for taskmanager)* >>>>>>>>> >>>>>>>>> kubectl exec pod/flink-jobmanager-0 -- ls -l >>>>>>>>> /opt/flink/plugins/s3-fs-hadoop >>>>>>>>> >>>>>>>>> total 19520 >>>>>>>>> >>>>>>>>> -rw-r--r-- 1 root root 19985452 Sep 10 06:27 >>>>>>>>> flink-s3-fs-hadoop-1.11.1.jar >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Arvid Heise | Senior Java Developer >>>>>>>> >>>>>>>> <https://www.ververica.com/> >>>>>>>> >>>>>>>> Follow us @VervericaData >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>>>> Conference >>>>>>>> >>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>>>> >>>>>>>> -- >>>>>>>> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB >>>>>>>> 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park >>>>>>>> Tung Jason, Ji (Toni) Cheng >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Arvid Heise | Senior Java Developer >>>>>> >>>>>> <https://www.ververica.com/> >>>>>> >>>>>> Follow us @VervericaData >>>>>> >>>>>> -- >>>>>> >>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>> Conference >>>>>> >>>>>> Stream Processing | Event Driven | Real Time >>>>>> >>>>>> -- >>>>>> >>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>> >>>>>> -- >>>>>> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>> Managing >>>>>> Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) >>>>>> Cheng >>>>>> >>>>>> >>>>> >>>> >>>> -- >>>> >>>> Arvid Heise | Senior Java Developer >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing >>>> Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng >>>> >>>> >>> -- Best, Jingsong Lee