Hi Dian,

On providing a Python implementation for
ConfluentRegistryAvroDeserializationSchema, I could deserialize and print
the confluent avro data using Pyflink. But since the GenericRecord returned
by  ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink
currently, I cannot perform any transformations on top of the data.

Caused by: java.lang.UnsupportedOperationException: The type information:
GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee
Details Value Schema.","fields":[{"name":"name","type":"string","doc":"String
Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not
supported in PyFlink currently.

Is there a way to convert this Generic Records returned
by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how
existing AvroRowDeserializationSchema is returning ?

Could you please suggest how to do this or any other solution to ingest
Confluent Avro data from Kafka topic?



Regards,
Zerah


On Thu, May 20, 2021 at 7:07 PM Zerah J <connectze...@gmail.com> wrote:

> Hi Dian,
>
> Thanks for your support.
>
> I could deserialize the ConfluentAvro data using
> ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord
> returned by  ConfluentRegistryAvroDeserializationSchema is not supported in
> PyFlink currently, I am unable to proceed.
>
> I can print the datastream using ds.print. Below is the result
> 3> {"name": "abc", "id": "123"}
> 3> {"name": "cde", "id": "456"}
>
>
> Apart from this none of the transformations are not working.
>
> Caused by: java.lang.UnsupportedOperationException: The type information:
> GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee
> Details Value
> Schema.","fields":[{"name":"name","type":"string","doc":"String
> Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not
> supported in PyFlink currently.
>
>
> Is there a way to convert this Generic Records returned
> by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how
> existing AvroRowDeserializationSchema is returning ?
> Or please suggest any other ways by which I can perform transformations
> and write the data to Kafka Topic
>
> Regards,
> Zerah
>
> On Wed, May 19, 2021 at 7:13 PM Zerah J <connectze...@gmail.com> wrote:
>
>> Thanks Dian. It worked for me
>>
>> Regards,
>> Zerah
>>
>> On Wed, May 19, 2021, 5:14 PM Dian Fu <dian0511...@gmail.com> wrote:
>>
>>> Hi Zerah,
>>>
>>> You could try to replace
>>> ```
>>> value_schema = avro.schema.parse(<reader schema goes here>)
>>> ```
>>>
>>> with the following code:
>>> ```
>>> JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
>>> value_schema = JSchemaParser().parse(value_schema_str)
>>> ```
>>>
>>> The reason is that ```value_schema = avro.schema.parse(<reader schema
>>> goes here>) ``` will create a Python object instead of Java object.
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年5月19日 下午5:23,Zerah J <connectze...@gmail.com> 写道:
>>>
>>> Hi Dian,
>>>
>>> Type of value_schema is <*class 'avro.schema.RecordSchema*'>
>>>
>>> I have only a Json schema string and schema registry url. Please find
>>> below snippet :
>>>
>>> import avro.schema
>>>
>>> value_schema_str = """
>>>     {
>>>       "namespace": "com.nextgen.customer",
>>>       "type": "record",
>>>       "name": "employee",
>>>       "doc": "Customer Details Value Schema.",
>>>       "fields": [
>>>         {
>>>           "doc": "String Value",
>>>           "name": "emp_name",
>>>           "type": "string"
>>>         },
>>>         {
>>>           "doc": "String Value",
>>>           "name": "emp_id",
>>>           "type": "string"
>>>         }
>>>       ]
>>>     }
>>> value_schema = avro.schema.parse(value_schema_str)
>>> schema_url = "http://host:port";
>>>
>>>
>>> How can I create Java Schema object from this schema string and pass it
>>> from python method ?
>>>
>>>
>>> Regards,
>>> Zerah
>>>
>>>
>>> On Wed, May 19, 2021 at 1:57 PM Dian Fu <dian0511...@gmail.com> wrote:
>>>
>>>> Hi Zerah,
>>>>
>>>> What’s the type of value_schema? It should be a Java object of type
>>>> Schema. From the exception, it seems that it’s a class instead of object.
>>>> Is this true?
>>>>
>>>> Regards,
>>>> Dian
>>>>
>>>> 2021年5月19日 下午3:41,Zerah J <connectze...@gmail.com> 写道:
>>>>
>>>> Hi Dian,
>>>>
>>>> Thanks for your suggestion.
>>>>
>>>> I tried to invoke
>>>> ConfluentRegistryAvroDeserializationSchema.forGeneric method from Python.
>>>> But it's not working. Kindly check the code snippet below :
>>>>
>>>> class MyAvroRowDeserializationSchema(DeserializationSchema):
>>>>
>>>>     def __init__(self, record_class: str = None, avro_schema_string:
>>>> schema = None, url: str = None):
>>>>             JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
>>>>
>>>> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
>>>>             j_deserialization_schema =
>>>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>>>
>>>>     super(MyAvroRowDeserializationSchema,
>>>> self).__init__(j_deserialization_schema)
>>>>
>>>>
>>>> FlinkKafkaConsumer is now invoked as below using
>>>> MyAvroRowDeserializationSchema :
>>>>
>>>> value_schema = avro.schema.parse(<reader schema goes here>)
>>>> schema_url = "http://host:port";
>>>> deserialization_schema =
>>>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>>>> kafka_source = FlinkKafkaConsumer(
>>>>         topics='my_topic',
>>>>         deserialization_schema=deserialization_schema,
>>>>         properties={'bootstrap.servers': 'host:port', 'group.id':
>>>> 'test_group'})
>>>>
>>>> I'm getting the below error :
>>>>
>>>> Traceback (most recent call last):
>>>>   File "flinkKafkaAvro.py", line 70, in datastream_api_demo
>>>>     deserialization_schema =
>>>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>>>>   File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
>>>>     j_deserialization_schema =
>>>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>>>   File
>>>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line
>>>> 1277, in __call__
>>>>     args_command, temp_args = self._build_args(*args)
>>>>   File
>>>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line
>>>> 1247, in _build_args
>>>>     [get_command_part(arg, self.pool) for arg in new_args])
>>>>   File
>>>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line
>>>> 1247, in <listcomp>
>>>>     [get_command_part(arg, self.pool) for arg in new_args])
>>>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py",
>>>> line 298, in get_command_part
>>>>     command_part = REFERENCE_TYPE + parameter._get_object_id()
>>>> AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
>>>>
>>>>
>>>>
>>>> Please suggest how this method should be called. Here the schema used
>>>> is avro schema.
>>>>
>>>> Regards,
>>>> Zerah
>>>>
>>>> On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com> wrote:
>>>>
>>>>> Hi Zerah,
>>>>>
>>>>> I guess you could provide a Python implementation for
>>>>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper
>>>>> for the Java implementation and so it’s will be very easy to implement. 
>>>>> You
>>>>> could take a look at AvroRowDeserializationSchema [1] as an example.
>>>>>
>>>>> Regards,
>>>>> Dian
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>>>>>
>>>>> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com> 写道:
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > I have below use case
>>>>> >
>>>>> > 1. Read streaming data from Kafka topic using Flink Python API
>>>>> > 2. Apply transformations on the data stream
>>>>> > 3. Write back to different kafka topics based on the incoming data
>>>>> >
>>>>> > Input data is coming from Confluent Avro Producer. By using the
>>>>> existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm
>>>>> unable to deserialize the data.
>>>>> >
>>>>> > Please help to process the data as
>>>>> ConfluentRegistryAvroDeserializationSchema is not available in the Python
>>>>> API.
>>>>> >
>>>>> > Regards,
>>>>> > Zerah
>>>>>
>>>>>
>>>>
>>>

Reply via email to