Hello Daniel, (I'm aware of the age of this question, but replying for posterity)
Does the exception still occur if you explicitly specify the output type? See example in Flink public doc: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/operators/overview/#map ``` data_stream.flat_map(lambda x: x.split(' '), output_type=Types.STRING()) ``` Best regards Keith Lee On Tue, Feb 11, 2025 at 9:02 PM Daniel Saxton <daniel.sax...@agilebits.com.invalid> wrote: > Hello, > > I am attempting to use the DataStream API in pyflink (apache-flink==1.20.0) > with Kinesis as both source and sink but encountering issues. For the > source I am using the FlinkKinesisConsumer with SimpleStringSchema for > deserialization and KinesisStreamsSink to create my sink, also using > SimpleStringSchema. > > The resulting data stream works fine for piping the data directly between > streams unmodified, but as soon as I try to apply any `map` to the data I > get Java type errors such as this: > > Caused by: java.lang.ClassCastException: class [B cannot be cast to class > java.lang.String ([B and java.lang.String are in module java.base of loader > 'bootstrap') > > Here is the code that produces this error (applying a no-op map to the > input data): > > stream_env.add_source(kinesis_consumer).map(lambda d: > d).sink_to(kinesis_sink) > > If I remove the map in the middle everything works fine and I can see data > getting sent to the Kinesis sink. If I instead get the type of the input > data and print it to the console it shows up as str. > > Any help would be appreciated, thank you. >