Hi Dian, Type of value_schema is <*class 'avro.schema.RecordSchema*'>
I have only a Json schema string and schema registry url. Please find below snippet : import avro.schema value_schema_str = """ { "namespace": "com.nextgen.customer", "type": "record", "name": "employee", "doc": "Customer Details Value Schema.", "fields": [ { "doc": "String Value", "name": "emp_name", "type": "string" }, { "doc": "String Value", "name": "emp_id", "type": "string" } ] } value_schema = avro.schema.parse(value_schema_str) schema_url = "http://host:port" How can I create Java Schema object from this schema string and pass it from python method ? Regards, Zerah On Wed, May 19, 2021 at 1:57 PM Dian Fu <dian0511...@gmail.com> wrote: > Hi Zerah, > > What’s the type of value_schema? It should be a Java object of type > Schema. From the exception, it seems that it’s a class instead of object. > Is this true? > > Regards, > Dian > > 2021年5月19日 下午3:41,Zerah J <connectze...@gmail.com> 写道: > > Hi Dian, > > Thanks for your suggestion. > > I tried to invoke ConfluentRegistryAvroDeserializationSchema.forGeneric > method from Python. But it's not working. Kindly check the code snippet > below : > > class MyAvroRowDeserializationSchema(DeserializationSchema): > > def __init__(self, record_class: str = None, avro_schema_string: > schema = None, url: str = None): > JConfluentAvroRowDeserializationSchema = get_gateway().jvm \ > > .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema > j_deserialization_schema = > JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) > > super(MyAvroRowDeserializationSchema, > self).__init__(j_deserialization_schema) > > > FlinkKafkaConsumer is now invoked as below using > MyAvroRowDeserializationSchema : > > value_schema = avro.schema.parse(<reader schema goes here>) > schema_url = "http://host:port" > deserialization_schema = > MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) > kafka_source = FlinkKafkaConsumer( > topics='my_topic', > deserialization_schema=deserialization_schema, > properties={'bootstrap.servers': 'host:port', 'group.id': > 'test_group'}) > > I'm getting the below error : > > Traceback (most recent call last): > File "flinkKafkaAvro.py", line 70, in datastream_api_demo > deserialization_schema = > MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) > File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__ > j_deserialization_schema = > JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) > File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", > line 1277, in __call__ > args_command, temp_args = self._build_args(*args) > File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", > line 1247, in _build_args > [get_command_part(arg, self.pool) for arg in new_args]) > File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", > line 1247, in <listcomp> > [get_command_part(arg, self.pool) for arg in new_args]) > File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", > line 298, in get_command_part > command_part = REFERENCE_TYPE + parameter._get_object_id() > AttributeError: 'RecordSchema' object has no attribute '_get_object_id' > > > > Please suggest how this method should be called. Here the schema used is > avro schema. > > Regards, > Zerah > > On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com> wrote: > >> Hi Zerah, >> >> I guess you could provide a Python implementation for >> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper >> for the Java implementation and so it’s will be very easy to implement. You >> could take a look at AvroRowDeserializationSchema [1] as an example. >> >> Regards, >> Dian >> >> [1] >> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303 >> >> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com> 写道: >> > >> > Hi, >> > >> > I have below use case >> > >> > 1. Read streaming data from Kafka topic using Flink Python API >> > 2. Apply transformations on the data stream >> > 3. Write back to different kafka topics based on the incoming data >> > >> > Input data is coming from Confluent Avro Producer. By using the >> existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm >> unable to deserialize the data. >> > >> > Please help to process the data as >> ConfluentRegistryAvroDeserializationSchema is not available in the Python >> API. >> > >> > Regards, >> > Zerah >> >> >