Hi Dan,

As far as I checked in the code, the FileSystemSink will try to create
staging directories from the client. I think it might be problematic, as
your case shows. We might need to revisit that part. I am cc'ing
Jingsong who worked on the FileSystemSink.

As a workaround you might try putting the s3 plugin on the CLI classpath
(not sure if plugins work for the CLI through the /plugins directory).

Best,

Dawid

On 10/09/2020 22:13, Dan Hill wrote:
> This is running on my local minikube and is trying to hit minio.
>
> On Thu, Sep 10, 2020 at 1:10 PM Dan Hill <quietgol...@gmail.com
> <mailto:quietgol...@gmail.com>> wrote:
>
>     I'm using this Helm chart
>     <https://github.com/riskfocus/helm-charts-public/tree/master/flink>. 
>     I start the job by building an image with the job jar and using
>     kubectl apply to do a flink run with the jar.
>
>     The log4j.properties on jobmanager and taskmanager have debug
>     level set and are pretty embedded into the Helm chart.  My
>     log4j-cli.properties is hacked on the CLI side.
>
>     I thought I just needed the s3 plugins in the jobmanager and
>     taskmanager.  Do I need to have a similar plugin structure from
>     the image where I run 'flink run'?
>
>
>     On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <quietgol...@gmail.com
>     <mailto:quietgol...@gmail.com>> wrote:
>
>         Copying more of the log
>
>         2020-09-10 19:50:17,712 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -
>         
> --------------------------------------------------------------------------------
>
>         2020-09-10 19:50:17,718 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -  Starting Command Line Client (Version: 1.11.1, Scala:
>         2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)
>
>         2020-09-10 19:50:17,719 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -  OS current user: root
>
>         2020-09-10 19:50:17,719 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>
>
>         2020-09-10 19:50:17,719 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation -
>         1.8/25.265-b01
>
>         2020-09-10 19:50:17,719 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -  Maximum heap size: 2167 MiBytes
>
>         tail: log/flink--client-flink-jobmanager-0.log: file truncated
>
>         2020-09-10 19:50:17,720 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -  JAVA_HOME: /usr/local/openjdk-8
>
>         2020-09-10 19:50:17,720 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -  No Hadoop Dependency available
>
>         2020-09-10 19:50:17,720 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -  JVM Options:
>
>         2020-09-10 19:50:17,720 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -    
>         -Djava.security.properties=/opt/flink/conf/security.properties
>
>         2020-09-10 19:50:17,720 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -    
>         -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log
>
>         2020-09-10 19:50:17,720 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -    
>         -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties
>
>         2020-09-10 19:50:17,721 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -    
>         -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties
>
>         2020-09-10 19:50:17,721 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -    
>         -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
>
>         2020-09-10 19:50:17,721 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -  Program Arguments:
>
>         2020-09-10 19:50:17,721 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -     list
>
>         2020-09-10 19:50:17,721 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -     --jobmanager
>
>         2020-09-10 19:50:17,721 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -     localhost:8081
>
>         2020-09-10 19:50:17,721 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -  Classpath:
>         
> /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::
>
>         2020-09-10 19:50:17,722 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] -
>         
> --------------------------------------------------------------------------------
>
>         2020-09-10 19:50:17,731 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         taskmanager.numberOfTaskSlots, 2
>
>         2020-09-10 19:50:17,732 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: blob.server.port, 6124
>
>         2020-09-10 19:50:17,732 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: taskmanager.rpc.port, 6122
>
>         2020-09-10 19:50:17,732 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: jobmanager.heap.size, 1g
>
>         2020-09-10 19:50:17,732 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         taskmanager.memory.process.size, 1g
>
>         2020-09-10 19:50:17,733 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: state.backend, rocksdb
>
>         2020-09-10 19:50:17,733 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: state.checkpoints.dir,
>         file:///flink_state/checkpoints
>
>         2020-09-10 19:50:17,733 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: state.savepoints.dir,
>         file:///flink_state/savepoints
>
>         2020-09-10 19:50:17,733 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: state.backend.async, true
>
>         2020-09-10 19:50:17,733 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         state.backend.fs.memory-threshold, 1024
>
>         2020-09-10 19:50:17,734 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         state.backend.fs.write-buffer-size, 4096
>
>         2020-09-10 19:50:17,734 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         state.backend.incremental, true
>
>         2020-09-10 19:50:17,734 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         state.backend.local-recovery, true
>
>         2020-09-10 19:50:17,734 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         state.checkpoints.num-retained, 1
>
>         2020-09-10 19:50:17,734 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         taskmanager.state.local.root-dirs,
>         file:///flink_state/local-recovery
>
>         2020-09-10 19:50:17,735 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         state.backend.rocksdb.checkpoint.transfer.thread.num, 1
>
>         2020-09-10 19:50:17,735 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         state.backend.rocksdb.localdir, /flink_state/rocksdb
>
>         2020-09-10 19:50:17,735 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         state.backend.rocksdb.options-factory,
>         
> org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory
>
>         2020-09-10 19:50:17,735 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         state.backend.rocksdb.predefined-options, DEFAULT
>
>         2020-09-10 19:50:17,735 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         state.backend.rocksdb.timer-service.factory, HEAP
>
>         2020-09-10 19:50:17,735 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         state.backend.rocksdb.ttl.compaction.filter.enabled, false
>
>         2020-09-10 19:50:17,736 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: jobmanager.rpc.address,
>         flink-jobmanager
>
>         2020-09-10 19:50:17,736 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: jobmanager.rpc.port, 6123
>
>         2020-09-10 19:50:17,736 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         taskmanager.memory.jvm-metaspace.size, 256mb
>
>         2020-09-10 19:50:17,736 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: s3a.endpoint,
>         http://minio:9000
>
>         2020-09-10 19:50:17,736 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: s3a.path.style.access, true
>
>         2020-09-10 19:50:17,736 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: s3a.access-key, YOURACCESSKEY
>
>         2020-09-10 19:50:17,737 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property: s3a.secret-key, ******
>
>         2020-09-10 19:50:17,737 INFO 
>         org.apache.flink.configuration.GlobalConfiguration          
>         [] - Loading configuration property:
>         s3a.aws.credentials.provider,
>         org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
>
>         2020-09-10 19:50:17,802 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] - Loading FallbackYarnSessionCli
>
>         2020-09-10 19:50:17,929 INFO 
>         org.apache.flink.core.fs.FileSystem                         
>         [] - Hadoop is not in the classpath/dependencies. The extended
>         set of supported File Systems via Hadoop is not available.
>
>         2020-09-10 19:50:18,102 INFO 
>         org.apache.flink.runtime.security.modules.HadoopModuleFactory
>         [] - Cannot create Hadoop Security Module because Hadoop
>         cannot be found in the Classpath.
>
>         2020-09-10 19:50:18,126 INFO 
>         org.apache.flink.runtime.security.modules.JaasModule        
>         [] - Jaas file will be created as
>         /tmp/jaas-1506212733867615019.conf.
>
>         2020-09-10 19:50:18,161 INFO 
>         
> org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory
>         [] - Cannot install HadoopSecurityContext because Hadoop
>         cannot be found in the Classpath.
>
>         2020-09-10 19:50:18,163 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] - Running 'list' command.
>
>         2020-09-10 19:50:18,226 INFO 
>         org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
>         [] - Could not load factory due to missing dependencies.
>
>         2020-09-10 19:50:19,107 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] - Waiting for response...
>
>         2020-09-10 19:50:19,414 INFO 
>         org.apache.flink.client.cli.CliFrontend                     
>         [] - Successfully retrieved list of jobs
>
>
>
>         On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise
>         <ar...@ververica.com <mailto:ar...@ververica.com>> wrote:
>
>             Hi Dan,
>
>             somehow enabling debug statements did not work.
>
>             However, the logs helps to narrow down the issue. The
>             exception occurs neither on jobmanager nor on taskmanager.
>             It occurs wherever you execute the command line interface.
>
>             How do you execute the job? Do you start it from your
>             machine? Can you try out to also add the respective s3
>             plugin there?
>
>             Best,
>
>             Arvid
>
>             On Thu, Sep 10, 2020 at 7:50 PM Dan Hill
>             <quietgol...@gmail.com <mailto:quietgol...@gmail.com>> wrote:
>
>                 I changed the levels to DEBUG.  I don't see useful
>                 data in the logs.
>
>                 
> https://drive.google.com/file/d/1ua1zsr3BInY_8xdsWwA__F0uloAqy-vG/view?usp=sharing
>
>                 On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise
>                 <ar...@ververica.com <mailto:ar...@ververica.com>> wrote:
>
>                     Could you try 1) or 2) and enable debug logging*
>                     and share the log with us?
>
>                     *Usually by adjusting
>                     |FLINK_HOME/conf/log4j.properties.|
>
>                     On Thu, Sep 10, 2020 at 5:38 PM Dan Hill
>                     <quietgol...@gmail.com
>                     <mailto:quietgol...@gmail.com>> wrote:
>
>                         Ah, sorry, it's a copy/paste issue with this
>                         email.  I've tried both:
>                         1) using s3a uri with flink-s3-fs-hadoop jar
>                         in /opt/flink/plugins/s3-fs-hadoop.
>                         2) using s3p uri with flink-s3-fs-presto jar
>                         in /opt/flink/plugins/s3-fs-presto.
>                         3) loading both 1 and 2
>                         4) trying s3 uri.
>
>                         When doing 1)
>
>                         Caused by:
>                         
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>                         Could not find a file system implementation
>                         for scheme 's3a'. The scheme is directly
>                         supported by Flink through the following
>                         plugin: flink-s3-fs-hadoop. Please ensure that
>                         each plugin resides within its own subfolder
>                         within the plugins directory. See
>                         
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>                         for more information. If you want to use a
>                         Hadoop file system for that scheme, please add
>                         the scheme to the configuration
>                         fs.allowed-fallback-filesystems. For a full
>                         list of supported file systems, please see
>                         
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>
>
>                         When doing 2)
>
>                         Caused by:
>                         
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>                         Could not find a file system implementation
>                         for scheme 's3p'. The scheme is directly
>                         supported by Flink through the following
>                         plugin: flink-s3-fs-presto. Please ensure that
>                         each plugin resides within its own subfolder
>                         within the plugins directory. See
>                         
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>                         for more information. If you want to use a
>                         Hadoop file system for that scheme, please add
>                         the scheme to the configuration
>                         fs.allowed-fallback-filesystems. For a full
>                         list of supported file systems, please see
>                         
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>
>
>                         etc
>
>                         On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise
>                         <ar...@ververica.com
>                         <mailto:ar...@ververica.com>> wrote:
>
>                             Hi Dan,
>
>                             s3p is only provided by flink-s3-fs-presto
>                             plugin. The plugin you used provides s3a.
>                             (and both provide s3, but it's good to use
>                             the more specific prefix).
>
>                             Best,
>
>                             Arvid
>
>                             On Thu, Sep 10, 2020 at 9:24 AM Dan Hill
>                             <quietgol...@gmail.com
>                             <mailto:quietgol...@gmail.com>> wrote:
>
>                                 *Background*
>                                 I'm converting some prototype Flink
>                                 v1.11.1 code that uses
>                                 DataSet/DataTable APIs to use the
>                                 Table API.
>
>                                 *Problem*
>                                 When switching to using the Table API,
>                                 my s3 plugins stopped working.  I
>                                 don't know why.  I've added the
>                                 required maven table dependencies to
>                                 the job.
>
>                                 I've tried us moving both the presto
>                                 and/or the hadoop s3 jars to plugin
>                                 subfolders.  No luck.   
>
>                                 Any ideas what is wrong?  I'm guessing
>                                 I'm missing something simple.
>
>
>                                 *Error*
>
>                                 Caused by:
>                                 
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>                                 Could not find a file system
>                                 implementation for scheme 's3p'. The
>                                 scheme is directly supported by Flink
>                                 through the following plugin:
>                                 flink-s3-fs-presto. Please ensure that
>                                 each plugin resides within its own
>                                 subfolder within the plugins
>                                 directory. See
>                                 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html
>                                 for more information. If you want to
>                                 use a Hadoop file system for that
>                                 scheme, please add the scheme to the
>                                 configuration
>                                 fs.allowed-fallback-filesystems. For a
>                                 full list of supported file systems,
>                                 please see
>                                 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
>
>                                 at
>                                 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)
>
>                                 at
>                                 
> org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
>
>                                 at
>                                 
> org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
>
>                                 at
>                                 
> org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)
>
>                                 ... 35 more
>
>
>                                 *ls of plugins directory (same for
>                                 taskmanager)*
>
>                                 kubectl exec pod/flink-jobmanager-0 
>                                 -- ls -l /opt/flink/plugins/s3-fs-hadoop
>
>                                 total 19520
>
>                                 -rw-r--r-- 1 root root 19985452 Sep 10
>                                 06:27 flink-s3-fs-hadoop-1.11.1.jar
>
>
>
>
>
>                             -- 
>
>                             Arvid Heise| Senior Java Developer
>
>                             <https://www.ververica.com/>
>
>
>                             Follow us @VervericaData
>
>                             --
>
>                             Join Flink Forward
>                             <https://flink-forward.org/>- The Apache
>                             FlinkConference
>
>                             Stream Processing | Event Driven | Real Time
>
>                             --
>
>                             Ververica GmbH | Invalidenstrasse 115,
>                             10115 Berlin, Germany
>
>                             --
>
>                             Ververica GmbHRegistered at Amtsgericht
>                             Charlottenburg: HRB 158244 BManaging
>                             Directors: Timothy Alexander Steinert, Yip
>                             Park Tung Jason, Ji (Toni) Cheng   
>
>
>
>                     -- 
>
>                     Arvid Heise| Senior Java Developer
>
>                     <https://www.ververica.com/>
>
>
>                     Follow us @VervericaData
>
>                     --
>
>                     Join Flink Forward <https://flink-forward.org/>-
>                     The Apache FlinkConference
>
>                     Stream Processing | Event Driven | Real Time
>
>                     --
>
>                     Ververica GmbH | Invalidenstrasse 115, 10115
>                     Berlin, Germany
>
>                     --
>
>                     Ververica GmbHRegistered at Amtsgericht
>                     Charlottenburg: HRB 158244 BManaging Directors:
>                     Timothy Alexander Steinert, Yip Park Tung Jason,
>                     Ji (Toni) Cheng   
>
>
>
>             -- 
>
>             Arvid Heise| Senior Java Developer
>
>             <https://www.ververica.com/>
>
>
>             Follow us @VervericaData
>
>             --
>
>             Join Flink Forward <https://flink-forward.org/>- The
>             Apache FlinkConference
>
>             Stream Processing | Event Driven | Real Time
>
>             --
>
>             Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
>             --
>
>             Ververica GmbHRegistered at Amtsgericht Charlottenburg:
>             HRB 158244 BManaging Directors: Timothy Alexander
>             Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to