Hi Harshit, Could you try to update the following line `ds = ds.map(lambda x: ','.join([str(value) for value in x]))` as following: `ds = ds.map(lambda x: ','.join([str(value) for value in x]), output_type=Types.STRING())`
The reason is that if the output type is not specified, it will be serialized using Pickle and so it will be a byte array. This works well for immediate transformations as the output of the upstream operations will also be deserialized using Pickle. However, when the output needs to be written to a sink, the output type of the upstream operation must be specified. See [1] for more details. Regards, Dian [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/data_types/#passing-python-records-to-java-operations On Sat, Apr 23, 2022 at 1:46 PM harshit.varsh...@iktara.ai < harshit.varsh...@iktara.ai> wrote: > Dear Team, > > > > I am new to pyflink and request for your support in issue I am facing with > Pyflink. I am using Pyflink version 1.14.4 & using reference code from > pyflink getting started pages. > > > > I am getting following error . > > py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute. > > : org.apache.flink.runtime.client.JobExecutionException: Job execution > failed. > > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed > by NoRestartBackoffTimeStrategy > > Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: > Caught exception while processing timer. > > Caused by: > TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator} > > Caused by: > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > > Caused by: java.lang.ClassCastException: [B cannot be cast to > java.lang.String > > > > > > Below is my code for reference.. > > > > > > import os > > > > from pyflink.common import SimpleStringSchema > > from pyflink.datastream import StreamExecutionEnvironment > > from pyflink.datastream.connectors import FlinkKafkaConsumer, > FlinkKafkaProducer > > > > > > from pyflink.common import Types > > > > > > def main(): > > env = StreamExecutionEnvironment.get_execution_environment() > > # the sql connector for kafka is used here as it's a fat jar and could > avoid dependency issues > > env.set_parallelism(1) > > kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)), > > 'flink-sql-connector-kafka_2.11-1.14.4.jar') > > env.add_jars("file:///{}".format(kafka_jar)) > > deserialization_schema = SimpleStringSchema() > > > > # Test for kafka consumer > > > > kafka_consumer = FlinkKafkaConsumer( > > topics='test', > > deserialization_schema=deserialization_schema, > > properties={'bootstrap.servers': 'localhost:9093'}) > > > > > > ds = env.add_source(kafka_consumer) > > #DATA USED IN KAFKA IS LIKE ('user1', 1, 2000) > > ds = ds.map(lambda x: eval(x)) > > ds = ds.map(lambda x: ','.join([str(value) for value in x])) > > > > #ds.print() > > > > > > kafka_producer = FlinkKafkaProducer( > > topic='testresult', > > serialization_schema=SimpleStringSchema(), > > producer_config={'bootstrap.servers': 'localhost:9093', 'group.id': > 'fraud_test'}) > > > > ds.add_sink(kafka_producer) > > env.execute('main') > > > > > > if __name__ == '__main__': > > main() > > > > Thanks and Regards, > > Harshit > > > > > > >