Hi Nick, You need to use another method to add sink to your job - sinkTo. KinesisStreamsSink implements newer Sink interface, while addSink expect old SinkFunction. You can see this by looking at method signatures[1] and in usage examples in documentation[2]
[1] https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/data_stream.py#L811-L837 [2] https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/#kinesis-streams-sink Best, Aleksandr On Thu, 23 May 2024 at 17:19, Nick Hecht <nick.he...@zainartech.com> wrote: > Hello, > > I am currently having issues trying to use the python flink 1.18 > Datastream api with the Amazon Kinesis Data Streams Connector. > > From the documentation > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/ > I have downloaded the "flink-connector-kinesis" jar > https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar > > and i have added it in my code: > > env = StreamExecutionEnvironment.get_execution_environment() > env.set_runtime_mode(RuntimeExecutionMode.STREAMING) > env.enable_checkpointing(5000) > > env.add_jars( > "file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar", > ) > > and it has worked perfectly so far when setting up my kinesis source, i > recently added a kinesis sink to complete my pipeline (was testing with > print before) > > # ds = ds.print() sink = KinesisStreamsSink.builder() \ > .set_kinesis_client_properties(config) \ > .set_serialization_schema(SimpleStringSchema()) \ > .set_partition_key_generator(PartitionKeyGenerator.fixed()) \ > .set_stream_name(stream_name) \ > .build() > > ds = ds.add_sink(sink) > > s_env.execute('pipeline') > > now when i run my python flink application it throws an error at my > add_sink call with the following exception: > > > python locations_flink_app.py > > 2024-05-23 14:53:10,219 - INFO - > apache_beam.typehints.native_type_compatibility - 315 - Using Any for > unsupported type: typing.Sequence[~T] > 2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No > module named google.cloud.bigquery_storage_v1. As a result, the > ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`. > Traceback (most recent call last): > File "locations_flink_app.py", line 90, in <module> > setup_flink_app(s_env, props) > File "locations_flink_app.py", line 71, in setup_flink_app > ds = ds.add_sink(create_sink( > File > "/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py", > line 819, in add_sink > return > DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function())) > File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py", line > 1322, in __call__ > return_value = get_return_value( > File > "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py", line > 146, in deco > return f(*a, **kw) > File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line > 330, in get_return_value > raise Py4JError( > py4j.protocol.Py4JError: An error occurred while calling o245.addSink. > Trace: > org.apache.flink.api.python.shaded.py4j.Py4JException: Method > addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]) > does not exist > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:829) > > when i open the jar i downloaded > (flink-sql-connector-kinesis-4.2.0-1.18.jar) i can see it actually has > the classes i need > Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink > has KinesisStreamsSink.class [class > org.apache.flink.connector.kinesis.sink.KinesisStreamsSink] > > If I remove the sink the source still works perfectly fine > (FlinkKinesisConsumer), but I don't understand what I'm missing. The jar > I'm using should have everything. > > anyone else have similar issues? or know what I might need to do? > > > Thank you, > > Nick Hecht >