Hi David, Thanks for your answer. I finally managed to read ORC files by: - switching to s3a:// in my Flink SQL table path parameter - providing all the properties in Hadoop's core-site.xml file (fs.s3a.endpoint, fs.s3a.path.style.access, fs.s3a.aws.credentials.provider, fs.s3a.access.key, fs.s3a.secret.key) - setting HADOOP_CONF_DIR env variable pointing to directory containing core-site.xml
Regards, Piotr On 2021/08/16 09:07:48, David Morávek <d...@apache.org> wrote: > Hi Piotr, > > unfortunately this is a known long-standing issue [1]. The problem is that > ORC format is not using Flink's filesystem abstraction for actual reading > of the underlying file, so you have to adjust your Hadoop config > accordingly. There is also a corresponding SO question [2] covering this. > > I think a proper fix would actually require changing the interface on ORC > side, because currently there seems to be now easy way to switch the FS > implementation (I've just quickly checked OrcFile class, so this might not > be 100% accurate). > > [1] https://issues.apache.org/jira/browse/FLINK-10989 > [2] https://stackoverflow.com/a/53435359 > > Best, > D. > > On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski <p...@touk.pl> wrote: > > > Hi, > > I want to use Flink SQL filesystem to read ORC file via S3 filesystem on > > Flink 1.13. My table definition looks like this: > > > > create or replace table xxx > > (..., startdate string) > > partitioned by (startdate) with ('connector'='filesystem', > > 'format'='orc', 'path'='s3://xxx/orc/yyy') > > > > I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO > > as S3 provider and it works for Flinks checkpoints and HA files. > > The SQL connector also works when I use CSV or Avro formats. The problems > > start with ORC > > > > 1. If I just put flink-orc on job's classpath I get error on JobManager: > > Caused by: java.lang.NoClassDefFoundError: > > org/apache/hadoop/conf/Configuration > > at > > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121) > > ~[?:?] > > at > > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88) > > ~[?:?] > > at > > org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118) > > ~[flink-table-blink_2.12-1.13.2.jar:1.13.2] > > > > 2. I managed to put hadoop common libs on the classpath by this maven > > setup: > > > > <dependency> > > <groupId>org.apache.flink</groupId> > > > > <artifactId>flink-orc_${scala.binary.version}</artifactId> > > <version>${flink.version}</version> > > <exclusions> > > <exclusion> > > <groupId>org.apache.orc</groupId> > > <artifactId>orc-core</artifactId> > > </exclusion> > > </exclusions> > > </dependency> > > <dependency> > > <groupId>org.apache.orc</groupId> > > <artifactId>orc-core</artifactId> > > <version>1.5.6</version> > > </dependency> > > <dependency> > > <groupId>org.apache.orc</groupId> > > <artifactId>orc-shims</artifactId> > > <version>1.5.6</version> > > </dependency> > > <dependency> > > <groupId>net.java.dev.jets3t</groupId> > > <artifactId>jets3t</artifactId> > > <version>0.9.0</version> > > </dependency> > > > > No the job is accepted by JobManager, but execution fails with lack of AWS > > credentials: > > Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and > > Secret Access Key must be specified as the username or password > > (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or > > fs.s3.awsSecretAccessKey properties (respectively). > > at > > org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70) > > at > > org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92) > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > > Method) > > at > > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > > Source) > > at > > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > > Source) > > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > > at > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) > > at > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > > at com.sun.proxy.$Proxy76.initialize(Unknown Source) > > at > > org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92) > > at > > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433) > > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88) > > at > > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467) > > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2449) > > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:367) > > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:287) > > at > > org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395) > > at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:368) > > at org.apache.orc.OrcFile.createReader(OrcFile.java:343) > > > > I guess that ORC reader tries to recreate s3 filesystem in job's > > classloader and cannot use credentials from flink-conf.yaml. However I can > > see in the logs that it earlier managed to list the files on MinIO: > > > > 2021-08-14 09:35:48,285 INFO > > org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner [] > > - Assigning remote split to requesting host '172': > > Optional[FileSourceSplit: > > s3://xxx/orc/yyy/startdate=2021-08-10/3cf3afae-1050-4591-a5af-98d231879687.orc > > [0, 144607) hosts=[localhost] ID=0000000002 position=null] > > > > > > So I think the issue is in ORCReader when it tries to read specific file. > > > > Any ideas hao can I modify the setup or pass the credentials to Jets3t? > > > > Regards, > > Piotr > > > > >