I've had success using the Java in pyflink via pyflink.java_gateway. Something like:
from pyflink.java_gateway import get_gateway jvm = get_gateway() # then perhaps something like: FlinkKinesisConsumer = jvm. org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer There also seems to be a nice java_utils.py <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/util/java_utils.py> with helpers that may uh, help. Not sure if this will work, you might need to use the python env's a java StreamTableEnvironment to do it? Here's an example <https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/datastream/stream_execution_environment.py#L922-L937> of how the python StreamTableEnvironment calls out to the Java one. BTW: I'm not an authority nor I have I really tried this, so take this advice with a grain of salt! :) Good luck! On Fri, Jun 24, 2022 at 9:06 AM John Tipper <john_tip...@hotmail.com> wrote: > Hi all, > > There are a number of connectors which do not appear to be in the Python > API v1.15.0, e.g. Kinesis. I can see that it's possible to use these > connectors by using the Table API: > > CREATE TABLE my_table (...) > WITH ('connector' = 'kinesis' ...) > > > I guess if you wanted the stream as a DataStream you'd I guess you'd > create the Table and then convert into a DataStream? > > Is there a way of directly instantiating these connectors in PyFlink > without needed to use SQL like this (and without having to wait until > v1.16)? e.g. the Java API looks like this: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>( > "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); > > > Many thanks, > > John >