Hello, I am currently having issues trying to use the python flink 1.18 Datastream api with the Amazon Kinesis Data Streams Connector.
>From the documentation https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/ I have downloaded the "flink-connector-kinesis" jar https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar and i have added it in my code: env = StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.STREAMING) env.enable_checkpointing(5000) env.add_jars( "file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar", ) and it has worked perfectly so far when setting up my kinesis source, i recently added a kinesis sink to complete my pipeline (was testing with print before) # ds = ds.print() sink = KinesisStreamsSink.builder() \ .set_kinesis_client_properties(config) \ .set_serialization_schema(SimpleStringSchema()) \ .set_partition_key_generator(PartitionKeyGenerator.fixed()) \ .set_stream_name(stream_name) \ .build() ds = ds.add_sink(sink) s_env.execute('pipeline') now when i run my python flink application it throws an error at my add_sink call with the following exception: > python locations_flink_app.py 2024-05-23 14:53:10,219 - INFO - apache_beam.typehints.native_type_compatibility - 315 - Using Any for unsupported type: typing.Sequence[~T] 2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`. Traceback (most recent call last): File "locations_flink_app.py", line 90, in <module> setup_flink_app(s_env, props) File "locations_flink_app.py", line 71, in setup_flink_app ds = ds.add_sink(create_sink( File "/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py", line 819, in add_sink return DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function())) File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( File "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line 330, in get_return_value raise Py4JError( py4j.protocol.Py4JError: An error occurred while calling o245.addSink. Trace: org.apache.flink.api.python.shaded.py4j.Py4JException: Method addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]) does not exist at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) when i open the jar i downloaded (flink-sql-connector-kinesis-4.2.0-1.18.jar) i can see it actually has the classes i need Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink has KinesisStreamsSink.class [class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink] If I remove the sink the source still works perfectly fine (FlinkKinesisConsumer), but I don't understand what I'm missing. The jar I'm using should have everything. anyone else have similar issues? or know what I might need to do? Thank you, Nick Hecht