Hi Kant,

Csv is supported in Kafka, but you should download and load flink-csv sql
jar into SQL CLI using `--library`.
Because, the Csv format factory is implemented in a separate module and not
bundled by default.

[1]:
https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.10.0/flink-csv-1.10.0-sql-jar.jar

On Sun, 1 Mar 2020 at 03:48, kant kodali <kanth...@gmail.com> wrote:

> Hi,
>
> Is CSV format supported for Kafka in Flink 1.10? It says I need to specify
> connector.type as Filesystem but documentation says it is supported for
> Kafka?
>
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.descriptors.Csv;
> import org.apache.flink.table.descriptors.Kafka;
> import org.apache.flink.table.descriptors.Schema;
> import org.apache.flink.types.Row;
>
> public class Test {
>
>     public static void main(String... args) throws Exception {
>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>
>         StreamExecutionEnvironment streamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         streamExecutionEnvironment.setStateBackend((StateBackend) new 
> RocksDBStateBackend("file:///tmp/rocksdb"));
>         StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>
>         tableEnvironment
>                 .connect(
>                     new Kafka()
>                         .version("0.11")
>                         .topic("test-topic1")
>                 )
>                 .withFormat(new Csv())
>                 .withSchema(new Schema().field("f0", DataTypes.STRING()))
>                 .inAppendMode()
>                 .createTemporaryTable("kafka_source");
>
>         Table resultTable = tableEnvironment.sqlQuery("select * from 
> kafka_source");
>         tableEnvironment.toAppendStream(resultTable, Row.class).print();
>
>         tableEnvironment.execute("Sample Job");
>     }
> }
>
>
> This code generates the following error
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: Required context properties mismatch.
>
> The matching candidates:
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'kafka'
>
> The following properties are requested:
> connector.property-version=1
> connector.topic=test-topic1
> connector.type=kafka
> connector.version=0.11
> format.property-version=1
> format.type=csv
> schema.0.data-type=VARCHAR(2147483647)
> schema.0.name=f0
> update-mode=append
>
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> at
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
> at
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
> ... 34 more
>
>
>

Reply via email to