Hi,
I want to use Flink SQL filesystem to read ORC file via S3 filesystem on Flink 
1.13. My table definition looks like this:

create or replace table xxx 
 (..., startdate string)
 partitioned by (startdate) with ('connector'='filesystem', 'format'='orc', 
'path'='s3://xxx/orc/yyy')

I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO as S3 
provider and it works for Flinks checkpoints and HA files. 
The SQL connector also works when I use CSV or Avro formats. The problems start 
with ORC

1. If I just put flink-orc on job's classpath I get error on JobManager:
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
        at 
org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121)
 ~[?:?]
        at 
org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88)
 ~[?:?]
        at 
org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
 ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]

2. I managed to put hadoop common libs on the classpath by this maven setup:

                <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-orc_${scala.binary.version}</artifactId>
                        <version>${flink.version}</version>
                        <exclusions>
                                <exclusion>
                                        <groupId>org.apache.orc</groupId>
                                        <artifactId>orc-core</artifactId>
                                </exclusion>
                        </exclusions>
                </dependency>
                <dependency>
                        <groupId>org.apache.orc</groupId>
                        <artifactId>orc-core</artifactId>
                        <version>1.5.6</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.orc</groupId>
                        <artifactId>orc-shims</artifactId>
                        <version>1.5.6</version>
                </dependency>
                <dependency>
                        <groupId>net.java.dev.jets3t</groupId>
                        <artifactId>jets3t</artifactId>
                        <version>0.9.0</version>
                </dependency>

No the job is accepted by JobManager, but execution fails with lack of AWS 
credentials:
Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and Secret 
Access Key must be specified as the username or password (respectively) of a s3 
URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey 
properties (respectively).
        at 
org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
        at 
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
        at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy76.initialize(Unknown Source)
        at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
        at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
        at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287)
        at org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395)
        at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:368)
        at org.apache.orc.OrcFile.createReader(OrcFile.java:343)

I guess that ORC reader tries to recreate s3 filesystem in job's classloader 
and cannot use credentials from flink-conf.yaml. However I can see in the logs 
that it earlier managed to list the files on MinIO:

2021-08-14 09:35:48,285 INFO  
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] - 
Assigning remote split to requesting host '172': Optional[FileSourceSplit: 
s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc 
[0, 144607)  hosts=[localhost] ID=0000000002 position=null]


So I think the issue is in ORCReader when it tries to read specific file.

Any ideas hao can I modify the setup or pass the credentials to Jets3t?

Regards,
Piotr

Reply via email to