Hi Zerah,

You could try to replace
```
value_schema = avro.schema.parse(<reader schema goes here>) 
```

with the following code:
```
JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
value_schema = JSchemaParser().parse(value_schema_str)
```

The reason is that ```value_schema = avro.schema.parse(<reader schema goes 
here>) ``` will create a Python object instead of Java object.

Regards,
Dian

> 2021年5月19日 下午5:23,Zerah J <connectze...@gmail.com> 写道:
> 
> Hi Dian,
> 
> Type of value_schema is <class 'avro.schema.RecordSchema'>
> 
> I have only a Json schema string and schema registry url. Please find below 
> snippet :
> 
> import avro.schema
> 
> value_schema_str = """
>     {           
>       "namespace": "com.nextgen.customer",
>       "type": "record",
>       "name": "employee",   
>       "doc": "Customer Details Value Schema.",
>       "fields": [
>         {
>           "doc": "String Value",
>           "name": "emp_name",
>           "type": "string"
>         },
>         {
>           "doc": "String Value",
>           "name": "emp_id",
>           "type": "string"
>         }
>       ]
>     }
> value_schema = avro.schema.parse(value_schema_str) 
> schema_url = "http://host:port";
> 
> 
> How can I create Java Schema object from this schema string and pass it from 
> python method ?
> 
> 
> Regards,
> Zerah
> 
> 
> On Wed, May 19, 2021 at 1:57 PM Dian Fu <dian0511...@gmail.com 
> <mailto:dian0511...@gmail.com>> wrote:
> Hi Zerah,
> 
> What’s the type of value_schema? It should be a Java object of type Schema. 
> From the exception, it seems that it’s a class instead of object. Is this 
> true?
> 
> Regards,
> Dian
> 
>> 2021年5月19日 下午3:41,Zerah J <connectze...@gmail.com 
>> <mailto:connectze...@gmail.com>> 写道:
>> 
>> Hi Dian,
>> 
>> Thanks for your suggestion.
>> 
>> I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric 
>> method from Python. But it's not working. Kindly check the code snippet 
>> below :
>> 
>> class MyAvroRowDeserializationSchema(DeserializationSchema):
>> 
>>     def __init__(self, record_class: str = None, avro_schema_string: schema 
>> = None, url: str = None):
>>             JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
>>                 
>> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
>>             j_deserialization_schema = 
>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>> 
>>     super(MyAvroRowDeserializationSchema, 
>> self).__init__(j_deserialization_schema)
>> 
>> 
>> FlinkKafkaConsumer is now invoked as below using 
>> MyAvroRowDeserializationSchema :
>> 
>> value_schema = avro.schema.parse(<reader schema goes here>) 
>> schema_url = "http://host:port <http://host:port>"
>> deserialization_schema = 
>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>> kafka_source = FlinkKafkaConsumer(
>>         topics='my_topic',
>>         deserialization_schema=deserialization_schema,
>>         properties={'bootstrap.servers': 'host:port', 'group.id 
>> <http://group.id/>': 'test_group'})
>> 
>> I'm getting the below error :
>> 
>> Traceback (most recent call last):
>>   File "flinkKafkaAvro.py", line 70, in datastream_api_demo
>>     deserialization_schema = 
>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>>   File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
>>     j_deserialization_schema = 
>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
>> line 1277, in __call__
>>     args_command, temp_args = self._build_args(*args)
>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
>> line 1247, in _build_args
>>     [get_command_part(arg, self.pool) for arg in new_args])
>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
>> line 1247, in <listcomp>
>>     [get_command_part(arg, self.pool) for arg in new_args])
>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 
>> 298, in get_command_part
>>     command_part = REFERENCE_TYPE + parameter._get_object_id()
>> AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
>> 
>> 
>> 
>> Please suggest how this method should be called. Here the schema used is 
>> avro schema.
>> 
>> Regards,
>> Zerah
>> 
>> On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com 
>> <mailto:dian0511...@gmail.com>> wrote:
>> Hi Zerah,
>> 
>> I guess you could provide a Python implementation for 
>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper 
>> for the Java implementation and so it’s will be very easy to implement. You 
>> could take a look at AvroRowDeserializationSchema [1] as an example.
>> 
>> Regards,
>> Dian
>> 
>> [1] 
>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>>  
>> <https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303>
>> 
>> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com 
>> > <mailto:connectze...@gmail.com>> 写道:
>> > 
>> > Hi,
>> > 
>> > I have below use case 
>> > 
>> > 1. Read streaming data from Kafka topic using Flink Python API
>> > 2. Apply transformations on the data stream
>> > 3. Write back to different kafka topics based on the incoming data
>> > 
>> > Input data is coming from Confluent Avro Producer. By using the existing 
>> > pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to 
>> > deserialize the data. 
>> > 
>> > Please help to process the data as 
>> > ConfluentRegistryAvroDeserializationSchema is not available in the Python 
>> > API.
>> > 
>> > Regards,
>> > Zerah
>> 
> 

Reply via email to