Hi Dan, As far as I checked in the code, the FileSystemSink will try to create staging directories from the client. I think it might be problematic, as your case shows. We might need to revisit that part. I am cc'ing Jingsong who worked on the FileSystemSink.
As a workaround you might try putting the s3 plugin on the CLI classpath (not sure if plugins work for the CLI through the /plugins directory). Best, Dawid On 10/09/2020 22:13, Dan Hill wrote: > This is running on my local minikube and is trying to hit minio. > > On Thu, Sep 10, 2020 at 1:10 PM Dan Hill <quietgol...@gmail.com > <mailto:quietgol...@gmail.com>> wrote: > > I'm using this Helm chart > <https://github.com/riskfocus/helm-charts-public/tree/master/flink>. > I start the job by building an image with the job jar and using > kubectl apply to do a flink run with the jar. > > The log4j.properties on jobmanager and taskmanager have debug > level set and are pretty embedded into the Helm chart. My > log4j-cli.properties is hacked on the CLI side. > > I thought I just needed the s3 plugins in the jobmanager and > taskmanager. Do I need to have a similar plugin structure from > the image where I run 'flink run'? > > > On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <quietgol...@gmail.com > <mailto:quietgol...@gmail.com>> wrote: > > Copying more of the log > > 2020-09-10 19:50:17,712 INFO > org.apache.flink.client.cli.CliFrontend > [] - > > -------------------------------------------------------------------------------- > > 2020-09-10 19:50:17,718 INFO > org.apache.flink.client.cli.CliFrontend > [] - Starting Command Line Client (Version: 1.11.1, Scala: > 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00) > > 2020-09-10 19:50:17,719 INFO > org.apache.flink.client.cli.CliFrontend > [] - OS current user: root > > 2020-09-10 19:50:17,719 INFO > org.apache.flink.client.cli.CliFrontend > [] - Current Hadoop/Kerberos user: <no hadoop dependency found> > > 2020-09-10 19:50:17,719 INFO > org.apache.flink.client.cli.CliFrontend > [] - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - > 1.8/25.265-b01 > > 2020-09-10 19:50:17,719 INFO > org.apache.flink.client.cli.CliFrontend > [] - Maximum heap size: 2167 MiBytes > > tail: log/flink--client-flink-jobmanager-0.log: file truncated > > 2020-09-10 19:50:17,720 INFO > org.apache.flink.client.cli.CliFrontend > [] - JAVA_HOME: /usr/local/openjdk-8 > > 2020-09-10 19:50:17,720 INFO > org.apache.flink.client.cli.CliFrontend > [] - No Hadoop Dependency available > > 2020-09-10 19:50:17,720 INFO > org.apache.flink.client.cli.CliFrontend > [] - JVM Options: > > 2020-09-10 19:50:17,720 INFO > org.apache.flink.client.cli.CliFrontend > [] - > -Djava.security.properties=/opt/flink/conf/security.properties > > 2020-09-10 19:50:17,720 INFO > org.apache.flink.client.cli.CliFrontend > [] - > -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log > > 2020-09-10 19:50:17,720 INFO > org.apache.flink.client.cli.CliFrontend > [] - > -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties > > 2020-09-10 19:50:17,721 INFO > org.apache.flink.client.cli.CliFrontend > [] - > -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties > > 2020-09-10 19:50:17,721 INFO > org.apache.flink.client.cli.CliFrontend > [] - > -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml > > 2020-09-10 19:50:17,721 INFO > org.apache.flink.client.cli.CliFrontend > [] - Program Arguments: > > 2020-09-10 19:50:17,721 INFO > org.apache.flink.client.cli.CliFrontend > [] - list > > 2020-09-10 19:50:17,721 INFO > org.apache.flink.client.cli.CliFrontend > [] - --jobmanager > > 2020-09-10 19:50:17,721 INFO > org.apache.flink.client.cli.CliFrontend > [] - localhost:8081 > > 2020-09-10 19:50:17,721 INFO > org.apache.flink.client.cli.CliFrontend > [] - Classpath: > > /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar::: > > 2020-09-10 19:50:17,722 INFO > org.apache.flink.client.cli.CliFrontend > [] - > > -------------------------------------------------------------------------------- > > 2020-09-10 19:50:17,731 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > taskmanager.numberOfTaskSlots, 2 > > 2020-09-10 19:50:17,732 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: blob.server.port, 6124 > > 2020-09-10 19:50:17,732 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: taskmanager.rpc.port, 6122 > > 2020-09-10 19:50:17,732 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: jobmanager.heap.size, 1g > > 2020-09-10 19:50:17,732 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > taskmanager.memory.process.size, 1g > > 2020-09-10 19:50:17,733 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: state.backend, rocksdb > > 2020-09-10 19:50:17,733 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: state.checkpoints.dir, > file:///flink_state/checkpoints > > 2020-09-10 19:50:17,733 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: state.savepoints.dir, > file:///flink_state/savepoints > > 2020-09-10 19:50:17,733 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: state.backend.async, true > > 2020-09-10 19:50:17,733 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > state.backend.fs.memory-threshold, 1024 > > 2020-09-10 19:50:17,734 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > state.backend.fs.write-buffer-size, 4096 > > 2020-09-10 19:50:17,734 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > state.backend.incremental, true > > 2020-09-10 19:50:17,734 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > state.backend.local-recovery, true > > 2020-09-10 19:50:17,734 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > state.checkpoints.num-retained, 1 > > 2020-09-10 19:50:17,734 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > taskmanager.state.local.root-dirs, > file:///flink_state/local-recovery > > 2020-09-10 19:50:17,735 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > state.backend.rocksdb.checkpoint.transfer.thread.num, 1 > > 2020-09-10 19:50:17,735 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > state.backend.rocksdb.localdir, /flink_state/rocksdb > > 2020-09-10 19:50:17,735 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > state.backend.rocksdb.options-factory, > > org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory > > 2020-09-10 19:50:17,735 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > state.backend.rocksdb.predefined-options, DEFAULT > > 2020-09-10 19:50:17,735 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > state.backend.rocksdb.timer-service.factory, HEAP > > 2020-09-10 19:50:17,735 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > state.backend.rocksdb.ttl.compaction.filter.enabled, false > > 2020-09-10 19:50:17,736 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: jobmanager.rpc.address, > flink-jobmanager > > 2020-09-10 19:50:17,736 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: jobmanager.rpc.port, 6123 > > 2020-09-10 19:50:17,736 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > taskmanager.memory.jvm-metaspace.size, 256mb > > 2020-09-10 19:50:17,736 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: s3a.endpoint, > http://minio:9000 > > 2020-09-10 19:50:17,736 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: s3a.path.style.access, true > > 2020-09-10 19:50:17,736 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: s3a.access-key, YOURACCESSKEY > > 2020-09-10 19:50:17,737 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: s3a.secret-key, ****** > > 2020-09-10 19:50:17,737 INFO > org.apache.flink.configuration.GlobalConfiguration > [] - Loading configuration property: > s3a.aws.credentials.provider, > org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider > > 2020-09-10 19:50:17,802 INFO > org.apache.flink.client.cli.CliFrontend > [] - Loading FallbackYarnSessionCli > > 2020-09-10 19:50:17,929 INFO > org.apache.flink.core.fs.FileSystem > [] - Hadoop is not in the classpath/dependencies. The extended > set of supported File Systems via Hadoop is not available. > > 2020-09-10 19:50:18,102 INFO > org.apache.flink.runtime.security.modules.HadoopModuleFactory > [] - Cannot create Hadoop Security Module because Hadoop > cannot be found in the Classpath. > > 2020-09-10 19:50:18,126 INFO > org.apache.flink.runtime.security.modules.JaasModule > [] - Jaas file will be created as > /tmp/jaas-1506212733867615019.conf. > > 2020-09-10 19:50:18,161 INFO > > org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory > [] - Cannot install HadoopSecurityContext because Hadoop > cannot be found in the Classpath. > > 2020-09-10 19:50:18,163 INFO > org.apache.flink.client.cli.CliFrontend > [] - Running 'list' command. > > 2020-09-10 19:50:18,226 INFO > org.apache.flink.client.deployment.DefaultClusterClientServiceLoader > [] - Could not load factory due to missing dependencies. > > 2020-09-10 19:50:19,107 INFO > org.apache.flink.client.cli.CliFrontend > [] - Waiting for response... > > 2020-09-10 19:50:19,414 INFO > org.apache.flink.client.cli.CliFrontend > [] - Successfully retrieved list of jobs > > > > On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise > <ar...@ververica.com <mailto:ar...@ververica.com>> wrote: > > Hi Dan, > > somehow enabling debug statements did not work. > > However, the logs helps to narrow down the issue. The > exception occurs neither on jobmanager nor on taskmanager. > It occurs wherever you execute the command line interface. > > How do you execute the job? Do you start it from your > machine? Can you try out to also add the respective s3 > plugin there? > > Best, > > Arvid > > On Thu, Sep 10, 2020 at 7:50 PM Dan Hill > <quietgol...@gmail.com <mailto:quietgol...@gmail.com>> wrote: > > I changed the levels to DEBUG. I don't see useful > data in the logs. > > > https://drive.google.com/file/d/1ua1zsr3BInY_8xdsWwA__F0uloAqy-vG/view?usp=sharing > > On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise > <ar...@ververica.com <mailto:ar...@ververica.com>> wrote: > > Could you try 1) or 2) and enable debug logging* > and share the log with us? > > *Usually by adjusting > |FLINK_HOME/conf/log4j.properties.| > > On Thu, Sep 10, 2020 at 5:38 PM Dan Hill > <quietgol...@gmail.com > <mailto:quietgol...@gmail.com>> wrote: > > Ah, sorry, it's a copy/paste issue with this > email. I've tried both: > 1) using s3a uri with flink-s3-fs-hadoop jar > in /opt/flink/plugins/s3-fs-hadoop. > 2) using s3p uri with flink-s3-fs-presto jar > in /opt/flink/plugins/s3-fs-presto. > 3) loading both 1 and 2 > 4) trying s3 uri. > > When doing 1) > > Caused by: > > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system implementation > for scheme 's3a'. The scheme is directly > supported by Flink through the following > plugin: flink-s3-fs-hadoop. Please ensure that > each plugin resides within its own subfolder > within the plugins directory. See > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html > for more information. If you want to use a > Hadoop file system for that scheme, please add > the scheme to the configuration > fs.allowed-fallback-filesystems. For a full > list of supported file systems, please see > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. > > > When doing 2) > > Caused by: > > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system implementation > for scheme 's3p'. The scheme is directly > supported by Flink through the following > plugin: flink-s3-fs-presto. Please ensure that > each plugin resides within its own subfolder > within the plugins directory. See > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html > for more information. If you want to use a > Hadoop file system for that scheme, please add > the scheme to the configuration > fs.allowed-fallback-filesystems. For a full > list of supported file systems, please see > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. > > > etc > > On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise > <ar...@ververica.com > <mailto:ar...@ververica.com>> wrote: > > Hi Dan, > > s3p is only provided by flink-s3-fs-presto > plugin. The plugin you used provides s3a. > (and both provide s3, but it's good to use > the more specific prefix). > > Best, > > Arvid > > On Thu, Sep 10, 2020 at 9:24 AM Dan Hill > <quietgol...@gmail.com > <mailto:quietgol...@gmail.com>> wrote: > > *Background* > I'm converting some prototype Flink > v1.11.1 code that uses > DataSet/DataTable APIs to use the > Table API. > > *Problem* > When switching to using the Table API, > my s3 plugins stopped working. I > don't know why. I've added the > required maven table dependencies to > the job. > > I've tried us moving both the presto > and/or the hadoop s3 jars to plugin > subfolders. No luck. > > Any ideas what is wrong? I'm guessing > I'm missing something simple. > > > *Error* > > Caused by: > > org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Could not find a file system > implementation for scheme 's3p'. The > scheme is directly supported by Flink > through the following plugin: > flink-s3-fs-presto. Please ensure that > each plugin resides within its own > subfolder within the plugins > directory. See > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html > for more information. If you want to > use a Hadoop file system for that > scheme, please add the scheme to the > configuration > fs.allowed-fallback-filesystems. For a > full list of supported file systems, > please see > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. > > at > > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473) > > at > > org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389) > > at > > org.apache.flink.core.fs.Path.getFileSystem(Path.java:292) > > at > > org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232) > > ... 35 more > > > *ls of plugins directory (same for > taskmanager)* > > kubectl exec pod/flink-jobmanager-0 > -- ls -l /opt/flink/plugins/s3-fs-hadoop > > total 19520 > > -rw-r--r-- 1 root root 19985452 Sep 10 > 06:27 flink-s3-fs-hadoop-1.11.1.jar > > > > > > -- > > Arvid Heise| Senior Java Developer > > <https://www.ververica.com/> > > > Follow us @VervericaData > > -- > > Join Flink Forward > <https://flink-forward.org/>- The Apache > FlinkConference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, > 10115 Berlin, Germany > > -- > > Ververica GmbHRegistered at Amtsgericht > Charlottenburg: HRB 158244 BManaging > Directors: Timothy Alexander Steinert, Yip > Park Tung Jason, Ji (Toni) Cheng > > > > -- > > Arvid Heise| Senior Java Developer > > <https://www.ververica.com/> > > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/>- > The Apache FlinkConference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 > Berlin, Germany > > -- > > Ververica GmbHRegistered at Amtsgericht > Charlottenburg: HRB 158244 BManaging Directors: > Timothy Alexander Steinert, Yip Park Tung Jason, > Ji (Toni) Cheng > > > > -- > > Arvid Heise| Senior Java Developer > > <https://www.ververica.com/> > > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/>- The > Apache FlinkConference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbHRegistered at Amtsgericht Charlottenburg: > HRB 158244 BManaging Directors: Timothy Alexander > Steinert, Yip Park Tung Jason, Ji (Toni) Cheng >
signature.asc
Description: OpenPGP digital signature