Thanks Dian. It worked for me Regards, Zerah
On Wed, May 19, 2021, 5:14 PM Dian Fu <dian0511...@gmail.com> wrote: > Hi Zerah, > > You could try to replace > ``` > value_schema = avro.schema.parse(<reader schema goes here>) > ``` > > with the following code: > ``` > JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser > value_schema = JSchemaParser().parse(value_schema_str) > ``` > > The reason is that ```value_schema = avro.schema.parse(<reader schema goes > here>) ``` will create a Python object instead of Java object. > > Regards, > Dian > > 2021年5月19日 下午5:23,Zerah J <connectze...@gmail.com> 写道: > > Hi Dian, > > Type of value_schema is <*class 'avro.schema.RecordSchema*'> > > I have only a Json schema string and schema registry url. Please find > below snippet : > > import avro.schema > > value_schema_str = """ > { > "namespace": "com.nextgen.customer", > "type": "record", > "name": "employee", > "doc": "Customer Details Value Schema.", > "fields": [ > { > "doc": "String Value", > "name": "emp_name", > "type": "string" > }, > { > "doc": "String Value", > "name": "emp_id", > "type": "string" > } > ] > } > value_schema = avro.schema.parse(value_schema_str) > schema_url = "http://host:port" > > > How can I create Java Schema object from this schema string and pass it > from python method ? > > > Regards, > Zerah > > > On Wed, May 19, 2021 at 1:57 PM Dian Fu <dian0511...@gmail.com> wrote: > >> Hi Zerah, >> >> What’s the type of value_schema? It should be a Java object of type >> Schema. From the exception, it seems that it’s a class instead of object. >> Is this true? >> >> Regards, >> Dian >> >> 2021年5月19日 下午3:41,Zerah J <connectze...@gmail.com> 写道: >> >> Hi Dian, >> >> Thanks for your suggestion. >> >> I tried to invoke ConfluentRegistryAvroDeserializationSchema.forGeneric >> method from Python. But it's not working. Kindly check the code snippet >> below : >> >> class MyAvroRowDeserializationSchema(DeserializationSchema): >> >> def __init__(self, record_class: str = None, avro_schema_string: >> schema = None, url: str = None): >> JConfluentAvroRowDeserializationSchema = get_gateway().jvm \ >> >> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema >> j_deserialization_schema = >> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) >> >> super(MyAvroRowDeserializationSchema, >> self).__init__(j_deserialization_schema) >> >> >> FlinkKafkaConsumer is now invoked as below using >> MyAvroRowDeserializationSchema : >> >> value_schema = avro.schema.parse(<reader schema goes here>) >> schema_url = "http://host:port" >> deserialization_schema = >> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) >> kafka_source = FlinkKafkaConsumer( >> topics='my_topic', >> deserialization_schema=deserialization_schema, >> properties={'bootstrap.servers': 'host:port', 'group.id': >> 'test_group'}) >> >> I'm getting the below error : >> >> Traceback (most recent call last): >> File "flinkKafkaAvro.py", line 70, in datastream_api_demo >> deserialization_schema = >> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) >> File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__ >> j_deserialization_schema = >> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) >> File >> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line >> 1277, in __call__ >> args_command, temp_args = self._build_args(*args) >> File >> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line >> 1247, in _build_args >> [get_command_part(arg, self.pool) for arg in new_args]) >> File >> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line >> 1247, in <listcomp> >> [get_command_part(arg, self.pool) for arg in new_args]) >> File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", >> line 298, in get_command_part >> command_part = REFERENCE_TYPE + parameter._get_object_id() >> AttributeError: 'RecordSchema' object has no attribute '_get_object_id' >> >> >> >> Please suggest how this method should be called. Here the schema used is >> avro schema. >> >> Regards, >> Zerah >> >> On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com> wrote: >> >>> Hi Zerah, >>> >>> I guess you could provide a Python implementation for >>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper >>> for the Java implementation and so it’s will be very easy to implement. You >>> could take a look at AvroRowDeserializationSchema [1] as an example. >>> >>> Regards, >>> Dian >>> >>> [1] >>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303 >>> >>> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com> 写道: >>> > >>> > Hi, >>> > >>> > I have below use case >>> > >>> > 1. Read streaming data from Kafka topic using Flink Python API >>> > 2. Apply transformations on the data stream >>> > 3. Write back to different kafka topics based on the incoming data >>> > >>> > Input data is coming from Confluent Avro Producer. By using the >>> existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm >>> unable to deserialize the data. >>> > >>> > Please help to process the data as >>> ConfluentRegistryAvroDeserializationSchema is not available in the Python >>> API. >>> > >>> > Regards, >>> > Zerah >>> >>> >> >