Hi Dian, Thanks for your support.
I could deserialize the ConfluentAvro data using ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord returned by ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink currently, I am unable to proceed. I can print the datastream using ds.print. Below is the result 3> {"name": "abc", "id": "123"} 3> {"name": "cde", "id": "456"} Apart from this none of the transformations are not working. Caused by: java.lang.UnsupportedOperationException: The type information: GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not supported in PyFlink currently. Is there a way to convert this Generic Records returned by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how existing AvroRowDeserializationSchema is returning ? Or please suggest any other ways by which I can perform transformations and write the data to Kafka Topic Regards, Zerah On Wed, May 19, 2021 at 7:13 PM Zerah J <connectze...@gmail.com> wrote: > Thanks Dian. It worked for me > > Regards, > Zerah > > On Wed, May 19, 2021, 5:14 PM Dian Fu <dian0511...@gmail.com> wrote: > >> Hi Zerah, >> >> You could try to replace >> ``` >> value_schema = avro.schema.parse(<reader schema goes here>) >> ``` >> >> with the following code: >> ``` >> JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser >> value_schema = JSchemaParser().parse(value_schema_str) >> ``` >> >> The reason is that ```value_schema = avro.schema.parse(<reader schema >> goes here>) ``` will create a Python object instead of Java object. >> >> Regards, >> Dian >> >> 2021年5月19日 下午5:23,Zerah J <connectze...@gmail.com> 写道: >> >> Hi Dian, >> >> Type of value_schema is <*class 'avro.schema.RecordSchema*'> >> >> I have only a Json schema string and schema registry url. Please find >> below snippet : >> >> import avro.schema >> >> value_schema_str = """ >> { >> "namespace": "com.nextgen.customer", >> "type": "record", >> "name": "employee", >> "doc": "Customer Details Value Schema.", >> "fields": [ >> { >> "doc": "String Value", >> "name": "emp_name", >> "type": "string" >> }, >> { >> "doc": "String Value", >> "name": "emp_id", >> "type": "string" >> } >> ] >> } >> value_schema = avro.schema.parse(value_schema_str) >> schema_url = "http://host:port" >> >> >> How can I create Java Schema object from this schema string and pass it >> from python method ? >> >> >> Regards, >> Zerah >> >> >> On Wed, May 19, 2021 at 1:57 PM Dian Fu <dian0511...@gmail.com> wrote: >> >>> Hi Zerah, >>> >>> What’s the type of value_schema? It should be a Java object of type >>> Schema. From the exception, it seems that it’s a class instead of object. >>> Is this true? >>> >>> Regards, >>> Dian >>> >>> 2021年5月19日 下午3:41,Zerah J <connectze...@gmail.com> 写道: >>> >>> Hi Dian, >>> >>> Thanks for your suggestion. >>> >>> I tried to invoke ConfluentRegistryAvroDeserializationSchema.forGeneric >>> method from Python. But it's not working. Kindly check the code snippet >>> below : >>> >>> class MyAvroRowDeserializationSchema(DeserializationSchema): >>> >>> def __init__(self, record_class: str = None, avro_schema_string: >>> schema = None, url: str = None): >>> JConfluentAvroRowDeserializationSchema = get_gateway().jvm \ >>> >>> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema >>> j_deserialization_schema = >>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) >>> >>> super(MyAvroRowDeserializationSchema, >>> self).__init__(j_deserialization_schema) >>> >>> >>> FlinkKafkaConsumer is now invoked as below using >>> MyAvroRowDeserializationSchema : >>> >>> value_schema = avro.schema.parse(<reader schema goes here>) >>> schema_url = "http://host:port" >>> deserialization_schema = >>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) >>> kafka_source = FlinkKafkaConsumer( >>> topics='my_topic', >>> deserialization_schema=deserialization_schema, >>> properties={'bootstrap.servers': 'host:port', 'group.id': >>> 'test_group'}) >>> >>> I'm getting the below error : >>> >>> Traceback (most recent call last): >>> File "flinkKafkaAvro.py", line 70, in datastream_api_demo >>> deserialization_schema = >>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) >>> File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__ >>> j_deserialization_schema = >>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) >>> File >>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line >>> 1277, in __call__ >>> args_command, temp_args = self._build_args(*args) >>> File >>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line >>> 1247, in _build_args >>> [get_command_part(arg, self.pool) for arg in new_args]) >>> File >>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line >>> 1247, in <listcomp> >>> [get_command_part(arg, self.pool) for arg in new_args]) >>> File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", >>> line 298, in get_command_part >>> command_part = REFERENCE_TYPE + parameter._get_object_id() >>> AttributeError: 'RecordSchema' object has no attribute '_get_object_id' >>> >>> >>> >>> Please suggest how this method should be called. Here the schema used is >>> avro schema. >>> >>> Regards, >>> Zerah >>> >>> On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com> wrote: >>> >>>> Hi Zerah, >>>> >>>> I guess you could provide a Python implementation for >>>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper >>>> for the Java implementation and so it’s will be very easy to implement. You >>>> could take a look at AvroRowDeserializationSchema [1] as an example. >>>> >>>> Regards, >>>> Dian >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303 >>>> >>>> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com> 写道: >>>> > >>>> > Hi, >>>> > >>>> > I have below use case >>>> > >>>> > 1. Read streaming data from Kafka topic using Flink Python API >>>> > 2. Apply transformations on the data stream >>>> > 3. Write back to different kafka topics based on the incoming data >>>> > >>>> > Input data is coming from Confluent Avro Producer. By using the >>>> existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm >>>> unable to deserialize the data. >>>> > >>>> > Please help to process the data as >>>> ConfluentRegistryAvroDeserializationSchema is not available in the Python >>>> API. >>>> > >>>> > Regards, >>>> > Zerah >>>> >>>> >>> >>