Hi all,

There are a number of connectors which do not appear to be in the Python API 
v1.15.0, e.g. Kinesis. I can see that it's possible to use these connectors by 
using the Table API:

CREATE TABLE my_table (...)
WITH ('connector' = 'kinesis' ...)

I guess if you wanted the stream as a DataStream you'd I guess you'd create the 
Table and then convert into a DataStream?

Is there a way of directly instantiating these connectors in PyFlink without 
needed to use SQL like this (and without having to wait until v1.16)? e.g. the 
Java API looks like this:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));

Many thanks,

John

Reply via email to