Hello,

I am attempting to use the DataStream API in pyflink (apache-flink==1.20.0)
with Kinesis as both source and sink but encountering issues. For the
source I am using the FlinkKinesisConsumer with SimpleStringSchema for
deserialization and KinesisStreamsSink to create my sink, also using
SimpleStringSchema.

The resulting data stream works fine for piping the data directly between
streams unmodified, but as soon as I try to apply any `map` to the data I
get Java type errors such as this:

Caused by: java.lang.ClassCastException: class [B cannot be cast to class
java.lang.String ([B and java.lang.String are in module java.base of loader
'bootstrap')

Here is the code that produces this error (applying a no-op map to the
input data):

stream_env.add_source(kinesis_consumer).map(lambda d:
d).sink_to(kinesis_sink)

If I remove the map in the middle everything works fine and I can see data
getting sent to the Kinesis sink. If I instead get the type of the input
data and print it to the console it shows up as str.

Any help would be appreciated, thank you.

Reply via email to