Hi Harshit,

Could you try to update the following line `ds = ds.map(lambda x:
','.join([str(value) for value in x]))` as following:
`ds = ds.map(lambda x: ','.join([str(value) for value in x]),
output_type=Types.STRING())`

The reason is that if the output type is not specified, it will be
serialized using Pickle and so it will be a byte array. This works well
for immediate transformations as the output of the upstream operations will
also be deserialized using Pickle. However, when the output needs to be
written to a sink, the output type of the upstream operation must be
specified. See [1] for more details.

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/data_types/#passing-python-records-to-java-operations

On Sat, Apr 23, 2022 at 1:46 PM harshit.varsh...@iktara.ai <
harshit.varsh...@iktara.ai> wrote:

> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.14.4 & using reference code from
> pyflink getting started pages.
>
>
>
> I am getting following error .
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
>
> : org.apache.flink.runtime.client.JobExecutionException: Job execution
> failed.
>
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
>
> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
> Caught exception while processing timer.
>
> Caused by:
> TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator}
>
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> Caused by: java.lang.ClassCastException: [B cannot be cast to
> java.lang.String
>
>
>
>
>
> Below is my code for reference..
>
>
>
>
>
> import os
>
>
>
> from pyflink.common import SimpleStringSchema
>
> from pyflink.datastream import StreamExecutionEnvironment
>
> from pyflink.datastream.connectors import FlinkKafkaConsumer,
> FlinkKafkaProducer
>
>
>
>
>
> from pyflink.common import Types
>
>
>
>
>
> def main():
>
>     env = StreamExecutionEnvironment.get_execution_environment()
>
>     # the sql connector for kafka is used here as it's a fat jar and could
> avoid dependency issues
>
>     env.set_parallelism(1)
>
>     kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),
>
>                              'flink-sql-connector-kafka_2.11-1.14.4.jar')
>
>     env.add_jars("file:///{}".format(kafka_jar))
>
>     deserialization_schema = SimpleStringSchema()
>
>
>
>     # Test for kafka consumer
>
>
>
>     kafka_consumer = FlinkKafkaConsumer(
>
>         topics='test',
>
>         deserialization_schema=deserialization_schema,
>
>         properties={'bootstrap.servers': 'localhost:9093'})
>
>
>
>
>
>     ds = env.add_source(kafka_consumer)
>
>     #DATA USED IN KAFKA IS LIKE ('user1', 1, 2000)
>
>     ds = ds.map(lambda x: eval(x))
>
>     ds = ds.map(lambda x: ','.join([str(value) for value in x]))
>
>
>
>     #ds.print()
>
>
>
>
>
>     kafka_producer = FlinkKafkaProducer(
>
>         topic='testresult',
>
>         serialization_schema=SimpleStringSchema(),
>
>         producer_config={'bootstrap.servers': 'localhost:9093', 'group.id':
> 'fraud_test'})
>
>
>
>     ds.add_sink(kafka_producer)
>
>     env.execute('main')
>
>
>
>
>
> if __name__ == '__main__':
>
>     main()
>
>
>
> Thanks and Regards,
>
> Harshit
>
>
>
>
>
>
>

Reply via email to