
James Kim commented on FLINK-24286:

Yep, that did the trick. However, now I'm getting a different issue regarding 
s3a endpoint.

When I build and run directly from the IDE, I get an error saying a couple 
- INFO: Error when creating PropertyDescriptor for public final void 
 Ignoring this property.
- com.amazonaws.SdkClientException: Failed to connect to service endpoint: 
- Caused by: java.net.NoRouteToHostException: No route to host (Host 
- Caused by: java.net.SocketTimeoutException: connect timed out


I think the issue is the s3 configurations that I've added in the pom.xml are 
not taken into account. Is pom.xml the correct place to add the s3a access key, 
s3a secret key, etc? I wasn't able to find information on the docs.


The pom.xml that I have now looks like the following:
<project xmlns="http://maven.apache.org/POM/4.0.0";











https://mvnrepository.com/artifact/org.apache.flink/flink-s3-fs-hadoop -->



> Flink TableEnvironment executesql on IntelliJ leads to Could not find a file 
> system implementation for scheme 's3a'
> -------------------------------------------------------------------------------------------------------------------
>                 Key: FLINK-24286
>                 URL: https://issues.apache.org/jira/browse/FLINK-24286
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Client
>    Affects Versions: 1.13.2
>         Environment: Ubuntu 18.04
>            Reporter: James Kim
>            Priority: Critical
> I'm trying to use the Table API in a simple Java class to create tables, run 
> queries, retrieve the results and use that data for computation. The data is 
> a CSV file from s3a (S3 compatible storage).
> When I open a terminal tab, start the cluster (standalone) in the flink 
> directory, and on another tab for Flink SQL client embedded and run queries 
> it works fine. I have the proper confs in conf/flink-conf.yaml. 
> However, now i'm tyring to do this programmatically from code so I created a 
> separate project directory on IntelliJ but when I run the program, I get the 
> following error:
> "Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.JobException: Creating the input splits caused an 
> error: Could not find a file system implementation for scheme 's3a'. The 
> scheme is directly supported by Flink through the following plugin: 
> flink-s3-fs-hadoop. Please ensure that each plugin resides within its own 
> subfolder within the plugins directory. See 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for 
> more information. If you want to use a Hadoop file system for that scheme, 
> please add the scheme to the configuration fs.allowed-fallback-filesystems."
> I've seen and fixed this error when running on the terminal but I run the 
> Main class directly from IntelliJ, I get the above error.
> Is there a way to configure the Main class to read from the flink-conf.yaml 
> file which is in a different path?
> Main.java:
> {code:java}
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.CloseableIterator;
> public class Main {
>     public static void main(String[] args) {
>         // create a TableEnvironment for batch or streaming execution
>         EnvironmentSettings settings = EnvironmentSettings
>                 .newInstance()
>                 .inBatchMode()
>                 .build();
>         TableEnvironment tableEnv = TableEnvironment.create(settings);
>         // create an input Table
>         TableResult tempResult = tableEnv.executeSql(
> //                "create temporary table ATHLETES (\n" +
>                         "create table ATHLETES (\n" +
>                 "name varchar,\n" +
>                 "country varchar,\n" +
>                 "sport varchar\n" +
>                 ") with (\n" +
>                 "'connector' = 'filesystem',\n" +
> "'path'='s3a://testbucket/james_experiment/2020_Tokyo_Olympics/Athletes.csv',\n"
>  +
>                 "'format'='csv'\n" +
>                 ")\n");
>         TableResult table2 = tableEnv.executeSql("select * from ATHLETES");
> }{code}
> pom.xml:
> {code:java}
> <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0";
>          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>     <modelVersion>4.0.0</modelVersion>
>     <groupId>groupId</groupId>
>     <artifactId>flink-ecs-sample</artifactId>
>     <version>1.0-SNAPSHOT</version>
>     <properties>
>         <maven.compiler.source>8</maven.compiler.source>
>         <maven.compiler.target>8</maven.compiler.target>
>     </properties>
>     <dependencies>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>             <version>1.13.2</version>
>             <scope>compile</scope>
> <!--            <scope>provided</scope>-->
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-planner-blink_2.11</artifactId>
>             <version>1.13.2</version>
>             <scope>compile</scope>
> <!--            <scope>provided</scope>-->
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-streaming-scala_2.11</artifactId>
>             <version>1.13.2</version>
>             <scope>compile</scope>
> <!--            <scope>provided</scope>-->
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-common</artifactId>
>             <version>1.13.2</version>
>             <scope>compile</scope>
> <!--            <scope>provided</scope>-->
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-csv</artifactId>
>             <version>1.13.2</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-clients_2.11</artifactId>
>             <version>1.13.2</version>
>         </dependency>
>     </dependencies>
> </project>
> {code}

This message was sent by Atlassian Jira

Reply via email to