Arvid,

I found a jira related to my issue.
https://issues.apache.org/jira/browse/FLINK-18096
Added a comment and I think Seth's idea is way better than just renaming
the current name of the record from avro schema.

Thanks,
Youngwoo



On Mon, May 17, 2021 at 8:37 PM Youngwoo Kim (김영우) <warwit...@gmail.com>
wrote:

> Hey Arvid,
>
> I found that It's a constant from Flink.
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L307
> I believe, it would be good to substitute 'record' to 'Record'
>
> What do you think?
>
> Thanks,
> Youngwoo
>
>
> On Mon, May 17, 2021 at 8:10 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Youngwoo,
>>
>> You can try to use aliases for it [1].
>>
>> Even better would be to use a different name for the record. In general,
>> since Avro originally comes from the Java World, it's more common to use
>> camel case for record names.
>>
>> [1] https://avro.apache.org/docs/current/spec.html#Aliases
>>
>> On Mon, May 17, 2021 at 11:29 AM Youngwoo Kim (김영우) <yw...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a table backed by confluent avro format and the generated schema
>>> from flink looks like following:
>>>
>>> {
>>>   "type": "record",
>>>   "name": "record",
>>>   "fields": [
>>>     {
>>>       "name": "dt",
>>>       "type": [
>>>         "null",
>>>         {
>>>           "type": "int",
>>>           "logicalType": "date"
>>>         }
>>>       ],
>>>       "default": null
>>>     },
>>>
>>> (snip)
>>>
>>> }
>>>
>>> At this moment, I have another application that reads avro schema from
>>> schema registry. Unfortunately, Got a traceback from the application:
>>>
>>> Traceback (most recent call last):
>>>
>>>   File "/usr/local/bin/datahub", line 8, in <module>
>>>
>>>     sys.exit(datahub())
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/click/core.py", line
>>> 829, in __call__
>>>
>>>     return self.main(*args, **kwargs)
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/click/core.py", line
>>> 782, in main
>>>
>>>     rv = self.invoke(ctx)
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/click/core.py", line
>>> 1259, in invoke
>>>
>>>     return _process_result(sub_ctx.command.invoke(sub_ctx))
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/click/core.py", line
>>> 1066, in invoke
>>>
>>>     return ctx.invoke(self.callback, **ctx.params)
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/click/core.py", line
>>> 610, in invoke
>>>
>>>     return callback(*args, **kwargs)
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/datahub/entrypoints.py",
>>> line 74, in ingest
>>>
>>>     pipeline.run()
>>>
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py",
>>> line 108, in run
>>>
>>>     for wu in self.source.get_workunits():
>>>
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py",
>>> line 79, in get_workunits
>>>
>>>     mce = self._extract_record(t)
>>>
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py",
>>> line 112, in _extract_record
>>>
>>>     fields = schema_util.avro_schema_to_mce_fields(schema.schema_str)
>>>
>>>   File
>>> "/usr/local/lib/python3.8/site-packages/datahub/ingestion/extractor/schema_util.py",
>>> line 117, in avro_schema_to_mce_fields
>>>
>>>     parsed_schema: avro.schema.Schema =
>>> schema_parse_fn(avro_schema_string)
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line
>>> 1244, in parse
>>>
>>>     return SchemaFromJSONData(json_data, names)
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line
>>> 1215, in SchemaFromJSONData
>>>
>>>     return parser(json_data, names=names)
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line
>>> 1136, in _SchemaFromJSONObject
>>>
>>>     return RecordSchema(
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line
>>> 1003, in __init__
>>>
>>>     super(RecordSchema, self).__init__(
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line
>>> 440, in __init__
>>>
>>>     names.Register(self)
>>>
>>>   File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line
>>> 399, in Register
>>>
>>>     raise SchemaParseException(
>>>
>>> avro.schema.SchemaParseException: record is a reserved type name.
>>>
>>>
>>>
>>> Full name of schema's record is `"name": "record",` but the `record` is
>>> one of avro complex type. See
>>> https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L399
>>>
>>>
>>> So, I wonder if I can set or change the name of avro record to avoid
>>> this parse exception.
>>>
>>>
>>> Thanks,
>>>
>>> Youngwoo
>>>
>>

Reply via email to