Hi, Is CSV format supported for Kafka in Flink 1.10? It says I need to specify connector.type as Filesystem but documentation says it is supported for Kafka?
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.types.Row; public class Test { public static void main(String... args) throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecutionEnvironment.setStateBackend((StateBackend) new RocksDBStateBackend("file:///tmp/rocksdb")); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment, settings); tableEnvironment .connect( new Kafka() .version("0.11") .topic("test-topic1") ) .withFormat(new Csv()) .withSchema(new Schema().field("f0", DataTypes.STRING())) .inAppendMode() .createTemporaryTable("kafka_source"); Table resultTable = tableEnvironment.sqlQuery("select * from kafka_source"); tableEnvironment.toAppendStream(resultTable, Row.class).print(); tableEnvironment.execute("Sample Job"); } } This code generates the following error Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvAppendTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.property-version=1 connector.topic=test-topic1 connector.type=kafka connector.version=0.11 format.property-version=1 format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=f0 update-mode=append The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) ... 34 more