Hi Dian, On providing a Python implementation for ConfluentRegistryAvroDeserializationSchema, I could deserialize and print the confluent avro data using Pyflink. But since the GenericRecord returned by ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I cannot perform any transformations on top of the data.
Caused by: java.lang.UnsupportedOperationException: The type information: GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently. Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ? Could you please suggest how to do this or any other solution to ingest Confluent Avro data from Kafka topic? Regards, Zerah On Thu, May 20, 2021 at 7:07 PM Zerah J <connectze...@gmail.com> wrote: > Hi Dian, > > Thanks for your support. > > I could deserialize the ConfluentAvro data using > ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord > returned by ConfluentRegistryAvroDeserializationSchema is not supported in > PyFlink currently, I am unable to proceed. > > I can print the datastream using ds.print. Below is the result > 3> {"name": "abc", "id": "123"} > 3> {"name": "cde", "id": "456"} > > > Apart from this none of the transformations are not working. > > Caused by: java.lang.UnsupportedOperationException: The type information: > GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee > Details Value > Schema.","fields":[{"name":"name","type":"string","doc":"String > Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not > supported in PyFlink currently. > > > Is there a way to convert this Generic Records returned > by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how > existing AvroRowDeserializationSchema is returning ? > Or please suggest any other ways by which I can perform transformations > and write the data to Kafka Topic > > Regards, > Zerah > > On Wed, May 19, 2021 at 7:13 PM Zerah J <connectze...@gmail.com> wrote: > >> Thanks Dian. It worked for me >> >> Regards, >> Zerah >> >> On Wed, May 19, 2021, 5:14 PM Dian Fu <dian0511...@gmail.com> wrote: >> >>> Hi Zerah, >>> >>> You could try to replace >>> ``` >>> value_schema = avro.schema.parse(<reader schema goes here>) >>> ``` >>> >>> with the following code: >>> ``` >>> JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser >>> value_schema = JSchemaParser().parse(value_schema_str) >>> ``` >>> >>> The reason is that ```value_schema = avro.schema.parse(<reader schema >>> goes here>) ``` will create a Python object instead of Java object. >>> >>> Regards, >>> Dian >>> >>> 2021年5月19日 下午5:23,Zerah J <connectze...@gmail.com> 写道: >>> >>> Hi Dian, >>> >>> Type of value_schema is <*class 'avro.schema.RecordSchema*'> >>> >>> I have only a Json schema string and schema registry url. Please find >>> below snippet : >>> >>> import avro.schema >>> >>> value_schema_str = """ >>> { >>> "namespace": "com.nextgen.customer", >>> "type": "record", >>> "name": "employee", >>> "doc": "Customer Details Value Schema.", >>> "fields": [ >>> { >>> "doc": "String Value", >>> "name": "emp_name", >>> "type": "string" >>> }, >>> { >>> "doc": "String Value", >>> "name": "emp_id", >>> "type": "string" >>> } >>> ] >>> } >>> value_schema = avro.schema.parse(value_schema_str) >>> schema_url = "http://host:port" >>> >>> >>> How can I create Java Schema object from this schema string and pass it >>> from python method ? >>> >>> >>> Regards, >>> Zerah >>> >>> >>> On Wed, May 19, 2021 at 1:57 PM Dian Fu <dian0511...@gmail.com> wrote: >>> >>>> Hi Zerah, >>>> >>>> What’s the type of value_schema? It should be a Java object of type >>>> Schema. From the exception, it seems that it’s a class instead of object. >>>> Is this true? >>>> >>>> Regards, >>>> Dian >>>> >>>> 2021年5月19日 下午3:41,Zerah J <connectze...@gmail.com> 写道: >>>> >>>> Hi Dian, >>>> >>>> Thanks for your suggestion. >>>> >>>> I tried to invoke >>>> ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python. >>>> But it's not working. Kindly check the code snippet below : >>>> >>>> class MyAvroRowDeserializationSchema(DeserializationSchema): >>>> >>>> def __init__(self, record_class: str = None, avro_schema_string: >>>> schema = None, url: str = None): >>>> JConfluentAvroRowDeserializationSchema = get_gateway().jvm \ >>>> >>>> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema >>>> j_deserialization_schema = >>>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) >>>> >>>> super(MyAvroRowDeserializationSchema, >>>> self).__init__(j_deserialization_schema) >>>> >>>> >>>> FlinkKafkaConsumer is now invoked as below using >>>> MyAvroRowDeserializationSchema : >>>> >>>> value_schema = avro.schema.parse(<reader schema goes here>) >>>> schema_url = "http://host:port" >>>> deserialization_schema = >>>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) >>>> kafka_source = FlinkKafkaConsumer( >>>> topics='my_topic', >>>> deserialization_schema=deserialization_schema, >>>> properties={'bootstrap.servers': 'host:port', 'group.id': >>>> 'test_group'}) >>>> >>>> I'm getting the below error : >>>> >>>> Traceback (most recent call last): >>>> File "flinkKafkaAvro.py", line 70, in datastream_api_demo >>>> deserialization_schema = >>>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) >>>> File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__ >>>> j_deserialization_schema = >>>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) >>>> File >>>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line >>>> 1277, in __call__ >>>> args_command, temp_args = self._build_args(*args) >>>> File >>>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line >>>> 1247, in _build_args >>>> [get_command_part(arg, self.pool) for arg in new_args]) >>>> File >>>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line >>>> 1247, in <listcomp> >>>> [get_command_part(arg, self.pool) for arg in new_args]) >>>> File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", >>>> line 298, in get_command_part >>>> command_part = REFERENCE_TYPE + parameter._get_object_id() >>>> AttributeError: 'RecordSchema' object has no attribute '_get_object_id' >>>> >>>> >>>> >>>> Please suggest how this method should be called. Here the schema used >>>> is avro schema. >>>> >>>> Regards, >>>> Zerah >>>> >>>> On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com> wrote: >>>> >>>>> Hi Zerah, >>>>> >>>>> I guess you could provide a Python implementation for >>>>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper >>>>> for the Java implementation and so it’s will be very easy to implement. >>>>> You >>>>> could take a look at AvroRowDeserializationSchema [1] as an example. >>>>> >>>>> Regards, >>>>> Dian >>>>> >>>>> [1] >>>>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303 >>>>> >>>>> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com> 写道: >>>>> > >>>>> > Hi, >>>>> > >>>>> > I have below use case >>>>> > >>>>> > 1. Read streaming data from Kafka topic using Flink Python API >>>>> > 2. Apply transformations on the data stream >>>>> > 3. Write back to different kafka topics based on the incoming data >>>>> > >>>>> > Input data is coming from Confluent Avro Producer. By using the >>>>> existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm >>>>> unable to deserialize the data. >>>>> > >>>>> > Please help to process the data as >>>>> ConfluentRegistryAvroDeserializationSchema is not available in the Python >>>>> API. >>>>> > >>>>> > Regards, >>>>> > Zerah >>>>> >>>>> >>>> >>>