Hi Zerah, Sorry for late response. I agree with your analysis. Currently, to be used in Python DataStream API, we have to provide a Java implementation which could produce Row instead of GenericRecord. As far as I know, currently there is still no built-in DeserializationSchema which could produce Row using with confluent schema registry. So I’m afraid that you have to implement a Java implementation yourself for now. I guess you could refer to DebeziumAvroDeserializationSchema [1] as example which produces RowData on top of ConfluentRegistryAvroDeserializationSchema.
Regards, Dian [1] https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java > 2021年5月21日 下午8:11,Zerah J <connectze...@gmail.com> 写道: > > Hi Dian, > > On providing a Python implementation for > ConfluentRegistryAvroDeserializationSchema, I could deserialize and print the > confluent avro data using Pyflink. But since the GenericRecord returned by > ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink > currently, I cannot perform any transformations on top of the data. > > Caused by: java.lang.UnsupportedOperationException: The type information: > GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee > Details Value > Schema.","fields":[{"name":"name","type":"string","doc":"String > Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not > supported in PyFlink currently. > > Is there a way to convert this Generic Records returned by > ConfluentRegistryAvroDeserializationSchema to into Flink rows like how > existing AvroRowDeserializationSchema is returning ? > > Could you please suggest how to do this or any other solution to ingest > Confluent Avro data from Kafka topic? > > > > Regards, > Zerah > > > On Thu, May 20, 2021 at 7:07 PM Zerah J <connectze...@gmail.com > <mailto:connectze...@gmail.com>> wrote: > Hi Dian, > > Thanks for your support. > > I could deserialize the ConfluentAvro data using > ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord > returned by ConfluentRegistryAvroDeserializationSchema is not supported in > PyFlink currently, I am unable to proceed. > > I can print the datastream using ds.print. Below is the result > 3> {"name": "abc", "id": "123"} > 3> {"name": "cde", "id": "456"} > > > Apart from this none of the transformations are not working. > > Caused by: java.lang.UnsupportedOperationException: The type information: > GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee > Details Value > Schema.","fields":[{"name":"name","type":"string","doc":"String > Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not > supported in PyFlink currently. > > > Is there a way to convert this Generic Records returned by > ConfluentRegistryAvroDeserializationSchema to into Flink rows like how > existing AvroRowDeserializationSchema is returning ? > Or please suggest any other ways by which I can perform transformations and > write the data to Kafka Topic > > Regards, > Zerah > > On Wed, May 19, 2021 at 7:13 PM Zerah J <connectze...@gmail.com > <mailto:connectze...@gmail.com>> wrote: > Thanks Dian. It worked for me > > Regards, > Zerah > > On Wed, May 19, 2021, 5:14 PM Dian Fu <dian0511...@gmail.com > <mailto:dian0511...@gmail.com>> wrote: > Hi Zerah, > > You could try to replace > ``` > value_schema = avro.schema.parse(<reader schema goes here>) > ``` > > with the following code: > ``` > JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser > value_schema = JSchemaParser().parse(value_schema_str) > ``` > > The reason is that ```value_schema = avro.schema.parse(<reader schema goes > here>) ``` will create a Python object instead of Java object. > > Regards, > Dian > >> 2021年5月19日 下午5:23,Zerah J <connectze...@gmail.com >> <mailto:connectze...@gmail.com>> 写道: >> >> Hi Dian, >> >> Type of value_schema is <class 'avro.schema.RecordSchema'> >> >> I have only a Json schema string and schema registry url. Please find below >> snippet : >> >> import avro.schema >> >> value_schema_str = """ >> { >> "namespace": "com.nextgen.customer", >> "type": "record", >> "name": "employee", >> "doc": "Customer Details Value Schema.", >> "fields": [ >> { >> "doc": "String Value", >> "name": "emp_name", >> "type": "string" >> }, >> { >> "doc": "String Value", >> "name": "emp_id", >> "type": "string" >> } >> ] >> } >> value_schema = avro.schema.parse(value_schema_str) >> schema_url = "http://host:port <http://host:port>" >> >> >> How can I create Java Schema object from this schema string and pass it from >> python method ? >> >> >> Regards, >> Zerah >> >> >> On Wed, May 19, 2021 at 1:57 PM Dian Fu <dian0511...@gmail.com >> <mailto:dian0511...@gmail.com>> wrote: >> Hi Zerah, >> >> What’s the type of value_schema? It should be a Java object of type Schema. >> From the exception, it seems that it’s a class instead of object. Is this >> true? >> >> Regards, >> Dian >> >>> 2021年5月19日 下午3:41,Zerah J <connectze...@gmail.com >>> <mailto:connectze...@gmail.com>> 写道: >>> >>> Hi Dian, >>> >>> Thanks for your suggestion. >>> >>> I tried to invoke ConfluentRegistryAvroDeserializationSchema.forGeneric >>> method from Python. But it's not working. Kindly check the code snippet >>> below : >>> >>> class MyAvroRowDeserializationSchema(DeserializationSchema): >>> >>> def __init__(self, record_class: str = None, avro_schema_string: schema >>> = None, url: str = None): >>> JConfluentAvroRowDeserializationSchema = get_gateway().jvm \ >>> >>> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema >>> j_deserialization_schema = >>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) >>> >>> super(MyAvroRowDeserializationSchema, >>> self).__init__(j_deserialization_schema) >>> >>> >>> FlinkKafkaConsumer is now invoked as below using >>> MyAvroRowDeserializationSchema : >>> >>> value_schema = avro.schema.parse(<reader schema goes here>) >>> schema_url = "http://host:port <http://host:port>" >>> deserialization_schema = >>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) >>> kafka_source = FlinkKafkaConsumer( >>> topics='my_topic', >>> deserialization_schema=deserialization_schema, >>> properties={'bootstrap.servers': 'host:port', 'group.id >>> <http://group.id/>': 'test_group'}) >>> >>> I'm getting the below error : >>> >>> Traceback (most recent call last): >>> File "flinkKafkaAvro.py", line 70, in datastream_api_demo >>> deserialization_schema = >>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url) >>> File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__ >>> j_deserialization_schema = >>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url) >>> File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", >>> line 1277, in __call__ >>> args_command, temp_args = self._build_args(*args) >>> File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", >>> line 1247, in _build_args >>> [get_command_part(arg, self.pool) for arg in new_args]) >>> File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", >>> line 1247, in <listcomp> >>> [get_command_part(arg, self.pool) for arg in new_args]) >>> File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py", line >>> 298, in get_command_part >>> command_part = REFERENCE_TYPE + parameter._get_object_id() >>> AttributeError: 'RecordSchema' object has no attribute '_get_object_id' >>> >>> >>> >>> Please suggest how this method should be called. Here the schema used is >>> avro schema. >>> >>> Regards, >>> Zerah >>> >>> On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com >>> <mailto:dian0511...@gmail.com>> wrote: >>> Hi Zerah, >>> >>> I guess you could provide a Python implementation for >>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper >>> for the Java implementation and so it’s will be very easy to implement. You >>> could take a look at AvroRowDeserializationSchema [1] as an example. >>> >>> Regards, >>> Dian >>> >>> [1] >>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303 >>> >>> <https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303> >>> >>> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com >>> > <mailto:connectze...@gmail.com>> 写道: >>> > >>> > Hi, >>> > >>> > I have below use case >>> > >>> > 1. Read streaming data from Kafka topic using Flink Python API >>> > 2. Apply transformations on the data stream >>> > 3. Write back to different kafka topics based on the incoming data >>> > >>> > Input data is coming from Confluent Avro Producer. By using the existing >>> > pyflink.common.serialization.AvroRowDeserializationSchema, I'm unable to >>> > deserialize the data. >>> > >>> > Please help to process the data as >>> > ConfluentRegistryAvroDeserializationSchema is not available in the Python >>> > API. >>> > >>> > Regards, >>> > Zerah >>> >> >