Hello, I am attempting to use the DataStream API in pyflink (apache-flink==1.20.0) with Kinesis as both source and sink but encountering issues. For the source I am using the FlinkKinesisConsumer with SimpleStringSchema for deserialization and KinesisStreamsSink to create my sink, also using SimpleStringSchema.
The resulting data stream works fine for piping the data directly between streams unmodified, but as soon as I try to apply any `map` to the data I get Java type errors such as this: Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap') Here is the code that produces this error (applying a no-op map to the input data): stream_env.add_source(kinesis_consumer).map(lambda d: d).sink_to(kinesis_sink) If I remove the map in the middle everything works fine and I can see data getting sent to the Kinesis sink. If I instead get the type of the input data and print it to the console it shows up as str. Any help would be appreciated, thank you.