Hi Dian,

Type of value_schema is <*class 'avro.schema.RecordSchema*'>

I have only a Json schema string and schema registry url. Please find below
snippet :

import avro.schema

value_schema_str = """
    {
      "namespace": "com.nextgen.customer",
      "type": "record",
      "name": "employee",
      "doc": "Customer Details Value Schema.",
      "fields": [
        {
          "doc": "String Value",
          "name": "emp_name",
          "type": "string"
        },
        {
          "doc": "String Value",
          "name": "emp_id",
          "type": "string"
        }
      ]
    }
value_schema = avro.schema.parse(value_schema_str)
schema_url = "http://host:port";


How can I create Java Schema object from this schema string and pass it
from python method ?


Regards,
Zerah


On Wed, May 19, 2021 at 1:57 PM Dian Fu <dian0511...@gmail.com> wrote:

> Hi Zerah,
>
> What’s the type of value_schema? It should be a Java object of type
> Schema. From the exception, it seems that it’s a class instead of object.
> Is this true?
>
> Regards,
> Dian
>
> 2021年5月19日 下午3:41,Zerah J <connectze...@gmail.com> 写道:
>
> Hi Dian,
>
> Thanks for your suggestion.
>
> I tried to invoke  ConfluentRegistryAvroDeserializationSchema.forGeneric
> method from Python. But it's not working. Kindly check the code snippet
> below :
>
> class MyAvroRowDeserializationSchema(DeserializationSchema):
>
>     def __init__(self, record_class: str = None, avro_schema_string:
> schema = None, url: str = None):
>             JConfluentAvroRowDeserializationSchema = get_gateway().jvm \
>
> .org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
>             j_deserialization_schema =
> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>
>     super(MyAvroRowDeserializationSchema,
> self).__init__(j_deserialization_schema)
>
>
> FlinkKafkaConsumer is now invoked as below using
> MyAvroRowDeserializationSchema :
>
> value_schema = avro.schema.parse(<reader schema goes here>)
> schema_url = "http://host:port";
> deserialization_schema =
> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
> kafka_source = FlinkKafkaConsumer(
>         topics='my_topic',
>         deserialization_schema=deserialization_schema,
>         properties={'bootstrap.servers': 'host:port', 'group.id':
> 'test_group'})
>
> I'm getting the below error :
>
> Traceback (most recent call last):
>   File "flinkKafkaAvro.py", line 70, in datastream_api_demo
>     deserialization_schema =
> MyAvroRowDeserializationSchema(avro_schema_string=value_schema,url=schema_url)
>   File "test_env/tests/SerializeAvroSchema.py", line 52, in __init__
>     j_deserialization_schema =
> JConfluentAvroRowDeserializationSchema.forGeneric(avro_schema_string, url)
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
> line 1277, in __call__
>     args_command, temp_args = self._build_args(*args)
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
> line 1247, in _build_args
>     [get_command_part(arg, self.pool) for arg in new_args])
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/java_gateway.py",
> line 1247, in <listcomp>
>     [get_command_part(arg, self.pool) for arg in new_args])
>   File "test_env/venv/lib64/python3.7/site-packages/py4j/protocol.py",
> line 298, in get_command_part
>     command_part = REFERENCE_TYPE + parameter._get_object_id()
> AttributeError: 'RecordSchema' object has no attribute '_get_object_id'
>
>
>
> Please suggest how this method should be called. Here the schema used is
> avro schema.
>
> Regards,
> Zerah
>
> On Mon, May 17, 2021 at 3:17 PM Dian Fu <dian0511...@gmail.com> wrote:
>
>> Hi Zerah,
>>
>> I guess you could provide a Python implementation for
>> ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper
>> for the Java implementation and so it’s will be very easy to implement. You
>> could take a look at AvroRowDeserializationSchema [1] as an example.
>>
>> Regards,
>> Dian
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.13/flink-python/pyflink/common/serialization.py#L303
>>
>> > 2021年5月17日 下午5:35,Zerah J <connectze...@gmail.com> 写道:
>> >
>> > Hi,
>> >
>> > I have below use case
>> >
>> > 1. Read streaming data from Kafka topic using Flink Python API
>> > 2. Apply transformations on the data stream
>> > 3. Write back to different kafka topics based on the incoming data
>> >
>> > Input data is coming from Confluent Avro Producer. By using the
>> existing pyflink.common.serialization.AvroRowDeserializationSchema, I'm
>> unable to deserialize the data.
>> >
>> > Please help to process the data as
>> ConfluentRegistryAvroDeserializationSchema is not available in the Python
>> API.
>> >
>> > Regards,
>> > Zerah
>>
>>
>

Reply via email to