Hi Dian,

Thanks for your support.

I could deserialize the ConfluentAvro data using
ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord
returned by  ConfluentRegistryAvroDeserializationSchema is not supported in
PyFlink currently, I am unable to proceed.

I can print the datastream using ds.print. Below is the result
3> {"name": "abc", "id": "123"}
3> {"name": "cde", "id": "456"}


Apart from this none of the transformations are not working.

Caused by: java.lang.UnsupportedOperationException: The type information:
GenericRecord("{"type":"record","name":"Employee_Details","namespace":"com.employee","doc":"Employee
Details Value
Schema.","fields":[{"name":"name","type":"string","doc":"String
Value"},{"name":"id","type":"string","doc":"String Value"}]}") is not
supported in PyFlink currently.


Is there a way to convert this Generic Records returned
by ConfluentRegistryAvroDeserializationSchema to into Flink rows like how
existing AvroRowDeserializationSchema is returning ?
Or please suggest any other ways by which I can perform transformations and
write the data to Kafka Topic

Regards,
Zerah

On Wed, May 19, 2021 at 7:13 PM Zerah J <connectze...@gmail.com> wrote:

> Thanks Dian. It worked for me
>
> Regards,
> Zerah
>
> On Wed, May 19, 2021, 5:14 PM Dian Fu <dian0511...@gmail.com> wrote:
>
>> Hi Zerah,
>>
>> You could try to replace
>> ```
>> value_schema = avro.schema.parse(<reader schema goes here>)
>> ```
>>
>> with the following code:
>> ```
>> JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
>> value_schema = JSchemaParser().parse(value_schema_str)
>> ```
>>
>> The reason is that ```value_schema = avro.schema.parse(<reader schema
>> goes here>) ``` will create a Python object instead of Java object.
>>
>> Regards,
>> Dian
>>
>> 2021年5月19日 下午5:23,Zerah J <connectze...@gmail.com> 写道:
>>
>> Hi Dian,
>>
>> Type of value_schema is <*class 'avro.schema.RecordSchema*'>
>>
>> I have only a Json schema string and schema registry url. Please find
>> below snippet :
>>
>> import avro.schema
>>
>> value_schema_str = """
>>     {
>>       "namespace": "com.nextgen.customer",
>>       "type": "record",
>>       "name": "employee",
>>       "doc": "Customer Details Value Schema.",
>>       "fields": [
>>         {
>>           "doc": "String Value",
>>           "name": "emp_name",
>>           "type": "string"
>>         },
>>         {
>>           "doc": "String Value",
>>           "name": "emp_id",
>>           "type": "string"
>>         }
>>       ]
>>     }
>> value_schema = avro.schema.parse(value_schema_str)
>> schema_url = "http://host:port";
>>
>>
>> How can I create Java Schema object from this schema string and pass it
>> from python method ?
>>
>>
>> Regards,
>> Zerah
>>
>>
>> On Wed, May 19, 2021 at 1:57 PM Dian Fu <dian0511...@gmail.com> wrote:
>>
>>> Hi Zerah,
>>>
>>> What’s the type of value_schema? It should be a Java object of type
>>> Schema. From the exception, it seems that it’s a class instead of object.
>>> Is this true?
>>>
>>> Regards,
>>> Dian
>>>
>>> 2021年5月19日 下午3:41,Zerah J <connectze...@gmail.com> 写道:
>>>
>>> Hi Dian,
>>>
>>> Thanks for your suggestion.
>>>
>>> I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric
>>> method from Python. But it's not working. Kindly check the code snippet
>>> below :
>>>
>>> class MyAvroRowDeserializationSchema(DeserializationSchema):
>>>
>>>     def __init__(self, record_class: str = None, avro_schema_string:
>>> schema = None, url: str = None):
>>>             JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
>>>
>>> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
>>>             j_deserialization_schema =
>>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>>
>>>     super(MyAvroRowDeserializationSchema,
>>> self).__init__(j_deserialization_schema)
>>>
>>>
>>> FlinkKafkaConsumer is now invoked as below using
>>> MyAvroRowDeserializationSchema :
>>>
>>> value_schema = avro.schema.parse(<reader schema goes here>)
>>> schema_url = "http://host:port";
>>> deserialization_schema =
>>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>>> kafka_source = FlinkKafkaConsumer(
>>>         topics='my_topic',
>>>         deserialization_schema=deserialization_schema,
>>>         properties={'bootstrap.servers': 'host:port', 'group.id':
>>> 'test_group'})
>>>
>>> I'm getting the below error :
>>>
>>> Traceback (most recent call last):
>>>   File "flinkKafkaAvro.py", line 70, in datastream_api_demo
>>>     deserialization_schema =
>>> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>>>   File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
>>>     j_deserialization_schema =
>>> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>>>   File
>>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line
>>> 1277, in __call__
>>>     args_command, temp_args = self._build_args(*args)
>>>   File
>>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line
>>> 1247, in _build_args
>>>     [get_command_part(arg, self.pool) for arg in new_args])
>>>   File
>>> "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py", line
>>> 1247, in <listcomp>
>>>     [get_command_part(arg, self.pool) for arg in new_args])
>>>   File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py",
>>> line 298, in get_command_part
>>>     command_part = REFERENCE_TYPE + parameter._get_object_id()
>>> AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
>>>
>>>
>>>
>>> Please suggest how this method should be called. Here the schema used is
>>> avro schema.
>>>
>>> Regards,
>>> Zerah
>>>
>>> On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com> wrote:
>>>
>>>> Hi Zerah,
>>>>
>>>> I guess you could provide a Python implementation for
>>>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper
>>>> for the Java implementation and so it’s will be very easy to implement. You
>>>> could take a look at AvroRowDeserializationSchema [1] as an example.
>>>>
>>>> Regards,
>>>> Dian
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>>>>
>>>> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com> 写道:
>>>> >
>>>> > Hi,
>>>> >
>>>> > I have below use case
>>>> >
>>>> > 1. Read streaming data from Kafka topic using Flink Python API
>>>> > 2. Apply transformations on the data stream
>>>> > 3. Write back to different kafka topics based on the incoming data
>>>> >
>>>> > Input data is coming from Confluent Avro Producer. By using the
>>>> existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm
>>>> unable to deserialize the data.
>>>> >
>>>> > Please help to process the data as
>>>> ConfluentRegistryAvroDeserializationSchema is not available in the Python
>>>> API.
>>>> >
>>>> > Regards,
>>>> > Zerah
>>>>
>>>>
>>>
>>

Reply via email to