Hi Kant, Csv is supported in Kafka, but you should download and load flink-csv sql jar into SQL CLI using `--library`. Because, the Csv format factory is implemented in a separate module and not bundled by default.
[1]: https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.10.0/flink-csv-1.10.0-sql-jar.jar On Sun, 1 Mar 2020 at 03:48, kant kodali <kanth...@gmail.com> wrote: > Hi, > > Is CSV format supported for Kafka in Flink 1.10? It says I need to specify > connector.type as Filesystem but documentation says it is supported for > Kafka? > > import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; > import org.apache.flink.runtime.state.StateBackend; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > import org.apache.flink.table.descriptors.Csv; > import org.apache.flink.table.descriptors.Kafka; > import org.apache.flink.table.descriptors.Schema; > import org.apache.flink.types.Row; > > public class Test { > > public static void main(String... args) throws Exception { > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > > StreamExecutionEnvironment streamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment(); > streamExecutionEnvironment.setStateBackend((StateBackend) new > RocksDBStateBackend("file:///tmp/rocksdb")); > StreamTableEnvironment tableEnvironment = > StreamTableEnvironment.create(streamExecutionEnvironment, settings); > > tableEnvironment > .connect( > new Kafka() > .version("0.11") > .topic("test-topic1") > ) > .withFormat(new Csv()) > .withSchema(new Schema().field("f0", DataTypes.STRING())) > .inAppendMode() > .createTemporaryTable("kafka_source"); > > Table resultTable = tableEnvironment.sqlQuery("select * from > kafka_source"); > tableEnvironment.toAppendStream(resultTable, Row.class).print(); > > tableEnvironment.execute("Sample Job"); > } > } > > > This code generates the following error > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > > Reason: Required context properties mismatch. > > The matching candidates: > org.apache.flink.table.sources.CsvAppendTableSourceFactory > Mismatched properties: > 'connector.type' expects 'filesystem', but is 'kafka' > > The following properties are requested: > connector.property-version=1 > connector.topic=test-topic1 > connector.type=kafka > connector.version=0.11 > format.property-version=1 > format.type=csv > schema.0.data-type=VARCHAR(2147483647) > schema.0.name=f0 > update-mode=append > > The following factories have been considered: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > at > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) > at > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52) > ... 34 more > > >