Hi Zerah,
Sorry for late response. I agree with your analysis. Currently, to be used in
Python DataStream API, we have to provide a Java implementation which could
produce Row instead of GenericRecord. As far as I know, currently there is
still no built-in DeserializationSchema which could prod
Hi Dian,
On providing a Python implementation for
ConfluentRegistryAvroDeserializationSchema, I could deserialize and print
the confluent avro data using Pyflink. But since the GenericRecord returned
by ConfluentRegistryAvroDeserializationSchema is not supported in PyFlink
currently, I cannot per
Hi Dian,
Thanks for your support.
I could deserialize the ConfluentAvro data using
ConfluentRegistryAvroDeserializationSchema, but since the GenericRecord
returned by ConfluentRegistryAvroDeserializationSchema is not supported in
PyFlink currently, I am unable to proceed.
I can print the datast
Thanks Dian. It worked for me
Regards,
Zerah
On Wed, May 19, 2021, 5:14 PM Dian Fu wrote:
> Hi Zerah,
>
> You could try to replace
> ```
> value_schema = avro.schema.parse()
> ```
>
> with the following code:
> ```
> JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
> value_schema
Hi Zerah,
You could try to replace
```
value_schema = avro.schema.parse()
```
with the following code:
```
JSchemaParser = get_gateway().jvm.org.apache.avro.Schema.Parser
value_schema = JSchemaParser().parse(value_schema_str)
```
The reason is that ```value_schema = avro.schema.parse() ``` will
Hi Dian,
Type of value_schema is <*class 'avro.schema.RecordSchema*'>
I have only a Json schema string and schema registry url. Please find below
snippet :
import avro.schema
value_schema_str = """
{
"namespace": "com.nextgen.customer",
"type": "record",
"name": "employee"
Hi Zerah,
What’s the type of value_schema? It should be a Java object of type Schema.
From the exception, it seems that it’s a class instead of object. Is this true?
Regards,
Dian
> 2021年5月19日 下午3:41,Zerah J 写道:
>
> Hi Dian,
>
> Thanks for your suggestion.
>
> I tried to invoke ConfluentRe
Hi Dian,
Thanks for your suggestion.
I tried to invoke ConfluentRegistryAvroDeserializationSchema.forGeneric
method from Python. But it's not working. Kindly check the code snippet
below :
class MyAvroRowDeserializationSchema(DeserializationSchema):
def __init__(self, record_class: str = N
Hi Zerah,
I guess you could provide a Python implementation for
ConfluentRegistryAvroDeserializationSchema if needed. It’s just a wrapper for
the Java implementation and so it’s will be very easy to implement. You could
take a look at AvroRowDeserializationSchema [1] as an example.
Regards,
Di
Hi,
I have below use case
1. Read streaming data from Kafka topic using Flink Python API
2. Apply transformations on the data stream
3. Write back to different kafka topics based on the incoming data
Input data is coming from Confluent Avro Producer. By using the existing
pyflink.common.serializ
10 matches
Mail list logo