James Kim created FLINK-24286: --------------------------------- Summary: Flink TableEnvironment executesql on IntelliJ leads to Could not find a file system implementation for scheme 's3a' Key: FLINK-24286 URL: https://issues.apache.org/jira/browse/FLINK-24286 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Client Affects Versions: 1.13.2 Environment: Ubuntu 18.04 Reporter: James Kim
I'm trying to use the Table API in a simple Java class to create tables, run queries, retrieve the results and use that data for computation. The data is a CSV file from s3a (S3 compatible storage). When I open a terminal tab, start the cluster (standalone) in the flink directory, and on another tab for Flink SQL client embedded and run queries it works fine. I have the proper confs in conf/flink-conf.yaml. However, now i'm tyring to do this programmatically from code so I created a separate project directory on IntelliJ but when I run the program, I get the following error: "Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems." I've seen and fixed this error when running on the terminal but I run the Main class directly from IntelliJ, I get the above error. Is there a way to configure the Main class to read from the flink-conf.yaml file which is in a different path? Main.java: {code:java} import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; public class Main { public static void main(String[] args) { // create a TableEnvironment for batch or streaming execution EnvironmentSettings settings = EnvironmentSettings .newInstance() .inBatchMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); // create an input Table TableResult tempResult = tableEnv.executeSql( // "create temporary table ATHLETES (\n" + "create table ATHLETES (\n" + "name varchar,\n" + "country varchar,\n" + "sport varchar\n" + ") with (\n" + "'connector' = 'filesystem',\n" + "'path'='s3a://testbucket/james_experiment/2020_Tokyo_Olympics/Athletes.csv',\n" + "'format'='csv'\n" + ")\n"); TableResult table2 = tableEnv.executeSql("select * from ATHLETES"); }{code} pom.xml: {code:java} <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>groupId</groupId> <artifactId>flink-ecs-sample</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.13.2</version> <scope>compile</scope> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.13.2</version> <scope>compile</scope> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.13.2</version> <scope>compile</scope> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.13.2</version> <scope>compile</scope> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.13.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.13.2</version> </dependency> </dependencies> </project> {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)