James Kim created FLINK-24285: --------------------------------- Summary: Flink Table API Could not find any format factory for identifier 'csv' in the classpath. Key: FLINK-24285 URL: https://issues.apache.org/jira/browse/FLINK-24285 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Client Affects Versions: 1.13.2 Environment: Ubuntu 18.04 Reporter: James Kim
I'm trying to read a csv file from s3 compatible storage through s3a protocol through Java code. This is the main class that I have: {code:java} import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; public class Main { public static void main(String[] args) { // create a TableEnvironment for batch or streaming execution EnvironmentSettings settings = EnvironmentSettings .newInstance() .inBatchMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); // create an input Table TableResult tempResult = tableEnv.executeSql( // "create temporary table ATHLETES (\n" + "create table ATHLETES (\n" + "name varchar,\n" + "country varchar,\n" + "sport varchar\n" + ") with (\n" + "'connector' = 'filesystem',\n" + "'path'='s3a://testbucket/expFolder/2020_Tokyo_Olympics/Athletes.csv',\n" + "'format'='csv'\n" + ")\n"); TableResult table2 = tableEnv.executeSql("select * from ATHLETES");lder {code} However, when I run this code, I get an exception at the executeSql call. The error log is the following: {code:java} SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".LF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.ATHLETES'. Table options are: 'connector'='filesystem''format'='csv''path'='s3a://testbucket/james_experiment/2020_Tokyo_Olympics/Athletes.csv' at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724) at Main.main(Main.java:31)Caused by: org.apache.flink.table.api.ValidationException: Could not find any format factory for identifier 'csv' in the classpath. at org.apache.flink.table.filesystem.FileSystemTableSource.<init>(FileSystemTableSource.java:97) at org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:74) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134) ... 19 more {code} The exception mentions that it is unable to create a source for reading table 'default_catalog.default_database.ATHLETES' and down below says Could not find any format for identifier 'csv' in the classpath. The pom.xml I have is the following: {code:java} <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>groupId</groupId> <artifactId>flink-ecs-sample</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.13.2</version> <scope>compile</scope> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.13.2</version> <scope>compile</scope> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.13.2</version> <scope>compile</scope> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.13.2</version> <scope>compile</scope> <!-- <scope>provided</scope>--> </dependency> </dependencies> </project> {code} I'm going through the docs ([https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/common/)] but I'm not sure why I'm getting this error. Could I get some help? -- This message was sent by Atlassian Jira (v8.3.4#803005)