Hi Zerah,

Sorry for late response. I agree with your analysis. Currently, to be used in 
Python DataStream API, we have to provide a Java implementation which could 
produce Row instead of GenericRecord. As far as I know, currently there is 
still no built-in DeserializationSchema which could produce Row using with 
confluent schema registry. So I’m afraid that you have to implement a Java 
implementation yourself for now. I guess you could refer to 
DebeziumAvroDeserializationSchema [1] as example which produces RowData on top 
of ConfluentRegistryAvroDeserializationSchema. 

Regards,
Dian

[1] 
https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java

> 2021年5月21日 下午8:11,Zerah J <connectze...@gmail.com> 写道:
> 
> Hi Dian,
> 
> On providing a Python implementation for 
> ConfluentRegistryAvroDeserializationSchema, I could deserialize and print the 
> confluent avro data using Pyflink. But since the GenericRecord returned by  
> ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink 
> currently, I cannot perform any transformations on top of the data. 
> 
> Caused by: java.lang.UnsupportedOperationException: The type information: 
> GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee
>  Details Value 
> Schema.","fields":[{"name":"name","type":"string","doc":"String 
> Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not 
> supported in PyFlink currently.
> 
> Is there a way to convert this Generic Records returned by 
> ConfluentRegistryAvroDeserializationSchema to into Flink rows like how 
> existing AvroRowDeserializationSchema is returning ?  
> 
> Could you please suggest how to do this or any other solution to ingest 
> Confluent Avro data from Kafka topic?
> 
> 
> 
> Regards,
> Zerah
> 
> 
> On Thu, May 20, 2021 at 7:07 PM Zerah J <connectze...@gmail.com 
> <mailto:connectze...@gmail.com>> wrote:
> Hi Dian,
> 
> Thanks for your support.
> 
> I could deserialize the ConfluentAvro data using 
> ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord 
> returned by  ConfluentRegistryAvroDeserializationSchema is not supported in 
> PyFlink currently, I am unable to proceed.
> 
> I can print the datastream using ds.print. Below is the result
> 3> {"name": "abc", "id": "123"}
> 3> {"name": "cde", "id": "456"}
> 
> 
> Apart from this none of the transformations are not working. 
> 
> Caused by: java.lang.UnsupportedOperationException: The type information: 
> GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee
>  Details Value 
> Schema.","fields":[{"name":"name","type":"string","doc":"String 
> Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not 
> supported in PyFlink currently.
> 
> 
> Is there a way to convert this Generic Records returned by 
> ConfluentRegistryAvroDeserializationSchema to into Flink rows like how 
> existing AvroRowDeserializationSchema is returning ? 
> Or please suggest any other ways by which I can perform transformations and 
> write the data to Kafka Topic
> 
> Regards,
> Zerah
> 
> On Wed, May 19, 2021 at 7:13 PM Zerah J <connectze...@gmail.com 
> <mailto:connectze...@gmail.com>> wrote:
> Thanks Dian. It worked for me
> 
> Regards,
> Zerah
> 
> On Wed, May 19, 2021, 5:14 PM Dian Fu <dian0511...@gmail.com 
> <mailto:dian0511...@gmail.com>> wrote:
> Hi Zerah,
> 
> You could try to replace
> ```
> value_schema = avro.schema.parse(<reader schema goes here>) 
> ```
> 
> with the following code:
> ```
> JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
> value_schema = JSchemaParser().parse(value_schema_str)
> ```
> 
> The reason is that ```value_schema = avro.schema.parse(<reader schema goes 
> here>) ``` will create a Python object instead of Java object.
> 
> Regards,
> Dian
> 
>> 2021年5月19日 下午5:23,Zerah J <connectze...@gmail.com 
>> <mailto:connectze...@gmail.com>> 写道:
>> 
>> Hi Dian,
>> 
>> Type of value_schema is <class 'avro.schema.RecordSchema'>
>> 
>> I have only a Json schema string and schema registry url. Please find below 
>> snippet :
>> 
>> import avro.schema
>> 
>> value_schema_str = """
>>     {           
>>       "namespace": "com.nextgen.customer",
>>       "type": "record",
>>       "name": "employee",   
>>       "doc": "Customer Details Value Schema.",
>>       "fields": [
>>         {
>>           "doc": "String Value",
>>           "name": "emp_name",
>>           "type": "string"
>>         },
>>         {
>>           "doc": "String Value",
>>           "name": "emp_id",
>>           "type": "string"
>>         }
>>       ]
>>     }
>> value_schema = avro.schema.parse(value_schema_str) 
>> schema_url = "http://host:port <http://host:port>"
>> 
>> 
>> How can I create Java Schema object from this schema string and pass it from 
>> python method ?
>> 
>> 
>> Regards,
>> Zerah
>> 
>> 
>> On Wed, May 19, 2021 at 1:57 PM Dian Fu <dian0511...@gmail.com 
>> <mailto:dian0511...@gmail.com>> wrote:
>> Hi Zerah,
>> 
>> What’s the type of value_schema? It should be a Java object of type Schema. 
>> From the exception, it seems that it’s a class instead of object. Is this 
>> true?
>> 
>> Regards,
>> Dian
>> 
>>> 2021年5月19日 下午3:41,Zerah J <connectze...@gmail.com 
>>> <mailto:connectze...@gmail.com>> 写道:
>>> 
>>> Hi Dian,
>>> 
>>> Thanks for your suggestion.
>>> 
>>> I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric 
>>> method from Python. But it's not working. Kindly check the code snippet 
>>> below :
>>> 
>>> class MyAvroRowDeserializationSchema(DeserializationSchema):
>>> 
>>>     def __init__(self, record_class: str = None, avro_schema_string: schema 
>>> = None, url: str = None):
>>>             JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
>>>                 
>>> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
>>>             j_deserialization_schema = 
>>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>> 
>>>     super(MyAvroRowDeserializationSchema, 
>>> self).__init__(j_deserialization_schema)
>>> 
>>> 
>>> FlinkKafkaConsumer is now invoked as below using 
>>> MyAvroRowDeserializationSchema :
>>> 
>>> value_schema = avro.schema.parse(<reader schema goes here>) 
>>> schema_url = "http://host:port <http://host:port>"
>>> deserialization_schema = 
>>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>>> kafka_source = FlinkKafkaConsumer(
>>>         topics='my_topic',
>>>         deserialization_schema=deserialization_schema,
>>>         properties={'bootstrap.servers': 'host:port', 'group.id 
>>> <http://group.id/>': 'test_group'})
>>> 
>>> I'm getting the below error :
>>> 
>>> Traceback (most recent call last):
>>>   File "flinkKafkaAvro.py", line 70, in datastream_api_demo
>>>     deserialization_schema = 
>>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>>>   File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
>>>     j_deserialization_schema = 
>>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
>>> line 1277, in __call__
>>>     args_command, temp_args = self._build_args(*args)
>>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
>>> line 1247, in _build_args
>>>     [get_command_part(arg, self.pool) for arg in new_args])
>>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
>>> line 1247, in <listcomp>
>>>     [get_command_part(arg, self.pool) for arg in new_args])
>>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 
>>> 298, in get_command_part
>>>     command_part = REFERENCE_TYPE + parameter._get_object_id()
>>> AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
>>> 
>>> 
>>> 
>>> Please suggest how this method should be called. Here the schema used is 
>>> avro schema.
>>> 
>>> Regards,
>>> Zerah
>>> 
>>> On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com 
>>> <mailto:dian0511...@gmail.com>> wrote:
>>> Hi Zerah,
>>> 
>>> I guess you could provide a Python implementation for 
>>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper 
>>> for the Java implementation and so it’s will be very easy to implement. You 
>>> could take a look at AvroRowDeserializationSchema [1] as an example.
>>> 
>>> Regards,
>>> Dian
>>> 
>>> [1] 
>>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>>>  
>>> <https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303>
>>> 
>>> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com 
>>> > <mailto:connectze...@gmail.com>> 写道:
>>> > 
>>> > Hi,
>>> > 
>>> > I have below use case 
>>> > 
>>> > 1. Read streaming data from Kafka topic using Flink Python API
>>> > 2. Apply transformations on the data stream
>>> > 3. Write back to different kafka topics based on the incoming data
>>> > 
>>> > Input data is coming from Confluent Avro Producer. By using the existing 
>>> > pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to 
>>> > deserialize the data. 
>>> > 
>>> > Please help to process the data as 
>>> > ConfluentRegistryAvroDeserializationSchema is not available in the Python 
>>> > API.
>>> > 
>>> > Regards,
>>> > Zerah
>>> 
>> 
> 

Reply via email to