Hi Zerah,

What’s the type of value_schema? It should be a Java object of type Schema. 
From the exception, it seems that it’s a class instead of object. Is this true?

Regards,
Dian

> 2021年5月19日 下午3:41,Zerah J <connectze...@gmail.com> 写道:
> 
> Hi Dian,
> 
> Thanks for your suggestion.
> 
> I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric 
> method from Python. But it's not working. Kindly check the code snippet below 
> :
> 
> class MyAvroRowDeserializationSchema(DeserializationSchema):
> 
>     def __init__(self, record_class: str = None, avro_schema_string: schema = 
> None, url: str = None):
>             JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
>                 
> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
>             j_deserialization_schema = 
> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
> 
>     super(MyAvroRowDeserializationSchema, 
> self).__init__(j_deserialization_schema)
> 
> 
> FlinkKafkaConsumer is now invoked as below using 
> MyAvroRowDeserializationSchema :
> 
> value_schema = avro.schema.parse(<reader schema goes here>) 
> schema_url = "http://host:port";
> deserialization_schema = 
> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
> kafka_source = FlinkKafkaConsumer(
>         topics='my_topic',
>         deserialization_schema=deserialization_schema,
>         properties={'bootstrap.servers': 'host:port', 'group.id 
> <http://group.id/>': 'test_group'})
> 
> I'm getting the below error :
> 
> Traceback (most recent call last):
>   File "flinkKafkaAvro.py", line 70, in datastream_api_demo
>     deserialization_schema = 
> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>   File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
>     j_deserialization_schema = 
> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
> line 1277, in __call__
>     args_command, temp_args = self._build_args(*args)
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
> line 1247, in _build_args
>     [get_command_part(arg, self.pool) for arg in new_args])
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", 
> line 1247, in <listcomp>
>     [get_command_part(arg, self.pool) for arg in new_args])
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line 
> 298, in get_command_part
>     command_part = REFERENCE_TYPE + parameter._get_object_id()
> AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
> 
> 
> 
> Please suggest how this method should be called. Here the schema used is avro 
> schema.
> 
> Regards,
> Zerah
> 
> On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com 
> <mailto:dian0511...@gmail.com>> wrote:
> Hi Zerah,
> 
> I guess you could provide a Python implementation for 
> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for 
> the Java implementation and so it’s will be very easy to implement. You could 
> take a look at AvroRowDeserializationSchema [1] as an example.
> 
> Regards,
> Dian
> 
> [1] 
> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>  
> <https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303>
> 
> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com 
> > <mailto:connectze...@gmail.com>> 写道:
> > 
> > Hi,
> > 
> > I have below use case 
> > 
> > 1. Read streaming data from Kafka topic using Flink Python API
> > 2. Apply transformations on the data stream
> > 3. Write back to different kafka topics based on the incoming data
> > 
> > Input data is coming from Confluent Avro Producer. By using the existing 
> > pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to 
> > deserialize the data. 
> > 
> > Please help to process the data as 
> > ConfluentRegistryAvroDeserializationSchema is not available in the Python 
> > API.
> > 
> > Regards,
> > Zerah
> 

Reply via email to