Re: Unable to deserialize Avro data using Pyflink

2021-05-21 Thread Dian Fu
Hi Zerah, Sorry for late response. I agree with your analysis. Currently, to be used in Python DataStream API, we have to provide a Java implementation which could produce Row instead of GenericRecord. As far as I know, currently there is still no built-in DeserializationSchema which could prod

Re: Unable to deserialize Avro data using Pyflink

2021-05-21 Thread Zerah J
Hi Dian, On providing a Python implementation for ConfluentRegistryAvroDeserializationSchema, I could deserialize and print the confluent avro data using Pyflink. But since the GenericRecord returned by ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I cannot per

Re: Unable to deserialize Avro data using Pyflink

2021-05-20 Thread Zerah J
Hi Dian, Thanks for your support. I could deserialize the ConfluentAvro data using ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord returned by ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I am unable to proceed. I can print the datast

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Thanks Dian. It worked for me Regards, Zerah On Wed, May 19, 2021, 5:14 PM Dian Fu wrote: > Hi Zerah, > > You could try to replace > ``` > value_schema = avro.schema.parse() > ``` > > with the following code: > ``` > JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser > value_schema

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Dian Fu
Hi Zerah, You could try to replace ``` value_schema = avro.schema.parse() ``` with the following code: ``` JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser value_schema = JSchemaParser().parse(value_schema_str) ``` The reason is that ```value_schema = avro.schema.parse() ``` will

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Hi Dian, Type of value_schema is <*class 'avro.schema.RecordSchema*'> I have only a Json schema string and schema registry url. Please find below snippet : import avro.schema value_schema_str = """ { "namespace": "com.nextgen.customer", "type": "record", "name": "employee"

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Dian Fu
Hi Zerah, What’s the type of value_schema? It should be a Java object of type Schema. From the exception, it seems that it’s a class instead of object. Is this true? Regards, Dian > 2021年5月19日 下午3:41,Zerah J 写道: > > Hi Dian, > > Thanks for your suggestion. > > I tried to invoke ConfluentRe

Re: Unable to deserialize Avro data using Pyflink

2021-05-19 Thread Zerah J
Hi Dian, Thanks for your suggestion. I tried to invoke ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. But it's not working. Kindly check the code snippet below : class MyAvroRowDeserializationSchema(DeserializationSchema): def __init__(self, record_class: str = N

Re: Unable to deserialize Avro data using Pyflink

2021-05-17 Thread Dian Fu
Hi Zerah, I guess you could provide a Python implementation for ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for the Java implementation and so it’s will be very easy to implement. You could take a look at AvroRowDeserializationSchema [1] as an example. Regards, Di

Unable to deserialize Avro data using Pyflink

2021-05-17 Thread Zerah J
Hi, I have below use case 1. Read streaming data from Kafka topic using Flink Python API 2. Apply transformations on the data stream 3. Write back to different kafka topics based on the incoming data Input data is coming from Confluent Avro Producer. By using the existing pyflink.common.serializ