Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.14.4 & using reference code from
pyflink getting started pages. 

 

I am getting following error . 

py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.

: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy

Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException:
Caught exception while processing timer.

Caused by:
TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOp
eratorException: Could not forward element to next operator}

Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException
: Could not forward element to next operator

Caused by: java.lang.ClassCastException: [B cannot be cast to
java.lang.String

 

 

Below is my code for reference..

 

 

import os

 

from pyflink.common import SimpleStringSchema

from pyflink.datastream import StreamExecutionEnvironment

from pyflink.datastream.connectors import FlinkKafkaConsumer,
FlinkKafkaProducer

 

 

from pyflink.common import Types

 

 

def main():

    env = StreamExecutionEnvironment.get_execution_environment()

    # the sql connector for kafka is used here as it's a fat jar and could
avoid dependency issues

    env.set_parallelism(1)

    kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)),

                             'flink-sql-connector-kafka_2.11-1.14.4.jar')

    env.add_jars("file:///{}".format(kafka_jar))

    deserialization_schema = SimpleStringSchema()

 

    # Test for kafka consumer

 

    kafka_consumer = FlinkKafkaConsumer(

        topics='test',

        deserialization_schema=deserialization_schema,

        properties={'bootstrap.servers': 'localhost:9093'})

 

 

    ds = env.add_source(kafka_consumer)

    #DATA USED IN KAFKA IS LIKE ('user1', 1, 2000)

    ds = ds.map(lambda x: eval(x))

    ds = ds.map(lambda x: ','.join([str(value) for value in x]))

 

    #ds.print()

 

 

    kafka_producer = FlinkKafkaProducer(

        topic='testresult',

        serialization_schema=SimpleStringSchema(),

        producer_config={'bootstrap.servers': 'localhost:9093', 'group.id':
'fraud_test'})

 

    ds.add_sink(kafka_producer)

    env.execute('main')

 

 

if __name__ == '__main__':

    main()

 

Thanks and Regards,

Harshit

 

 

 

Reply via email to