Hi Yun Gao,

Thank you for your prompt response.

I've changed the table 'format' from 'parquet' to 'raw' as in your example
and I've been able to access the file:

Job has been submitted with JobID 441e7518bb615109624c1f33f222475b
+--------------------------------+
|                            url |
+--------------------------------+
|AR1 ����
| 2021-05-16 ���� ... |
|                   2021-05-16 |
| 2021-05-16 ... |

So it seems the problem is linked with the parquet dependencies. This is my
POM:

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project
        xmlns="http://maven.apache.org/POM/4.0.0";
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-s3</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>Flink Quickstart Job</name>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.12.2</flink.version>
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.12.1</log4j.version>
    </properties>
    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            
<url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>
    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not
be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- Add connector dependencies here. They must be in the
default scope (compile). -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-hadoop</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 
https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core
-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Add logging framework, to produce console output when
running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR
by default. -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>
            <!-- We use the maven-shade plugin to create a fat jar
that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if
your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>

<exclude>org.apache.flink:force-shading</exclude>

<exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>

<exclude>org.apache.logging.log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in
the META-INF folder.
                                    Otherwise, this might cause
SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

<mainClass>org.apache.flink.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
        <pluginManagement>
            <plugins>
                <!-- This improves the out-of-the-box experience in
Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>


Another interesting fact is when exporting the access and secret key as env
variables and adding:

fs.s3a.aws.credentials.provider:
com.amazonaws.auth.EnvironmentVariableCredentialsProvider

to flink-conf.yaml I'm able to read and decode the parquet file properly:

Job has been submitted with JobID 15453b8be2bddcb49c1141a01013bf81
+--------------------------------+-------------+
|                           date |       value |
+--------------------------------+-------------+
|                     2021-05-16 |           7 |
|                     2021-05-16 |           8 |
|                     2021-05-16 |           8 |
|                     2021-05-16 |           8 |
|                     2021-05-16 |           1 |
|                     2021-05-16 |           3 |
|                     2021-05-16 |           9 |
|                     2021-05-16 |           9 |
|                     2021-05-16 |           8 |
|                     2021-05-16 |           7 |
|                     2021-05-16 |           9


We won't be able to set env variables in production, so this is not a valid
workaround for us.

Please have a look to the POM so you can tell a dependency miss or misuse
of some sort.

Thank you very much.

Angelo.


On Thu, May 20, 2021 at 11:54 AM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Angelo,
>
> I tried the fail case provied with a similar one:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings
>  settings = 
> EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();TableEnvironment
>  t_env = 
> TableEnvironment.create(settings);t_env.getConfig().getConfiguration().setString("parallelism.default",
>  "1");t_env.executeSql("CREATE TABLE example (  `url` STRING) WITH ( 
> 'connector' = 'filesystem', 'path' = 's3a://whatnamedoyouwant/links', 
> 'format' = 'raw')");Table t1 = t_env.from("example");t1.execute().print();*
>
>
> However, it seems the job could be executed successfully.
>
> I further tried with the configuration, and found that the exception
> is thrown if there is no s3a.access-key or s3a.secret-key
> configured. Could you have a look at if the two configuration items
> are effective ?
>
> Also I only configured the s3a.path-style: true, s3a.access-key and
> s3a.secret-key, is it possible to remove the other configuration items
> and have a try ?
>
> Best,
> Yun
>
>
>
> ------------------Original Mail ------------------
> *Sender:*Angelo G. <angelo.guas...@gmail.com>
> *Send Date:*Wed May 19 00:24:42 2021
> *Recipients:*Flink User Mail List <user@flink.apache.org>
> *Subject:*Issue reading from S3
>
>> Hi,
>>
>> I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting
>> the job to local cluster (tar.gz distribution). I do not have a Hadoop
>> installation running in the same machine. S3 (not Amazon) is running in a
>> remote location and I have access to it via endpoint and access/secret keys.
>>
>> The issue is that I'm able to read and write from and to S3 when
>> using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText
>> methods but I can't read from S3 when using the table API.
>>
>> This is the application:
>>
>> package org.apache.flink;import org.apache.flink.core.fs.FileSystem;import 
>> org.apache.flink.streaming.api.datastream.DataStream;import 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
>> org.apache.flink.table.api.EnvironmentSettings;import 
>> org.apache.flink.table.api.Table;import 
>> org.apache.flink.table.api.TableEnvironment;public class ReadTables {    
>> public static void main(String[] args) throws Exception {        // CLASSIC 
>> API (PLAIN TEXT)        StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();        
>> DataStream<String> ds = env.readTextFile("s3a://bucket/source.txt");        
>> ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE);     
>>    env.execute();        // TABLE API        EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();   
>>      TableEnvironment t_env = TableEnvironment.create(settings);        
>> t_env.getConfig().getConfiguration().setString("parallelism.default", "1");  
>>             t_env.executeSql("CREATE TABLE example (  `date` STRING,  
>> `value` INT) WITH ( 'connector' = 'filesystem', 'path' = 
>> 's3a://bucket/xxx/yyy/', 'format' = 'parquet')");        Table t1 = 
>> t_env.from("example");        t1.execute().print();    }}
>>
>>
>> The first job works properly, reading the source.txt file and writing it
>> to dest.txt.
>>
>> The second job does not work:
>>
>> $~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c
>> org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar;
>>
>> Job has been submitted with JobID c690f2222aed0051d1501d5b9747b56f
>> Program execution finished
>> Job with JobID c690f2222aed0051d1501d5b9747b56f has finished.
>> Job Runtime: 17358 ms
>>
>> Job has been submitted with JobID ebe54017faa83af33923d50892283e11
>> +--------------------------------+-------------+
>> |                           date |       value |
>> +--------------------------------+-------------+
>>
>> ------------------------------------------------------------
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Failed to fetch next result
>> Caused by: java.lang.RuntimeException: Failed to fetch next result
>> Caused by: java.io.IOException: Failed to fetch job execution result
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: ebe54017faa83af33923d50892283e11)
>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> Job failed (JobID: ebe54017faa83af33923d50892283e11)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
>> by NoRestartBackoffTimeStrategy
>> Caused by: java.lang.RuntimeException: One or more fetchers have
>> encountered exception
>> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
>> unexpected exception while polling the records
>> Caused by: java.net.SocketTimeoutException: doesBucketExist on
>> scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS
>> Credentials provided by BasicAWSCredentialsProvider
>> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
>> com.amazonaws.SdkClientException: Failed to connect to service endpoint:
>> Caused by: com.amazonaws.AmazonClientException: No AWS Credentials
>> provided by BasicAWSCredentialsProvider
>> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
>> com.amazonaws.SdkClientException: Failed to connect to service endpoint:
>> Caused by: com.amazonaws.SdkClientException: Failed to connect to service
>> endpoint:
>> Caused by: java.net.SocketTimeoutException: connect timed out
>>
>> I have the access credentials configured in flink-conf.yaml file:
>>
>> s3a.endpoint: http://s3.xxxxxxx
>> s3a.path-style: true
>> s3a.access-key: xxxxxxxxx
>> s3a.secret-key: xxxxxxxxx
>> s3a.entropy.key: _entropy_
>> s3a.entropy.length: 4
>> s3a.region: s3
>> s3a.bucket: xxxxxxxxx
>>
>> I copied the flink-s3-fs-hadoop jar in the plugins folder but I had to
>> add it as a dependency (not provided) to the pom, otherwise a S3AFileSystem
>> 'class not found' exception arises.
>>
>> Thank you for your help,
>>
>> Angelo.
>>
>>
>>

Reply via email to