Hi,
I have a table backed by confluent avro format and the generated schema
from flink looks like following:
{
"type": "record",
"name": "record",
"fields": [
{
"name": "dt",
"type": [
"null",
{
"type": "int",
"logicalType": "date"
}
],
"default": null
},
(snip)
}
At this moment, I have another application that reads avro schema from
schema registry. Unfortunately, Got a traceback from the application:
Traceback (most recent call last):
File "/usr/local/bin/datahub", line 8, in <module>
sys.exit(datahub())
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in
__call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in
main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259,
in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066,
in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in
invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/datahub/entrypoints.py",
line 74, in ingest
pipeline.run()
File
"/usr/local/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py",
line 108, in run
for wu in self.source.get_workunits():
File
"/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py",
line 79, in get_workunits
mce = self._extract_record(t)
File
"/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py",
line 112, in _extract_record
fields = schema_util.avro_schema_to_mce_fields(schema.schema_str)
File
"/usr/local/lib/python3.8/site-packages/datahub/ingestion/extractor/schema_util.py",
line 117, in avro_schema_to_mce_fields
parsed_schema: avro.schema.Schema = schema_parse_fn(avro_schema_string)
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1244,
in parse
return SchemaFromJSONData(json_data, names)
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1215,
in SchemaFromJSONData
return parser(json_data, names=names)
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1136,
in _SchemaFromJSONObject
return RecordSchema(
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1003,
in __init__
super(RecordSchema, self).__init__(
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 440,
in __init__
names.Register(self)
File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 399,
in Register
raise SchemaParseException(
avro.schema.SchemaParseException: record is a reserved type name.
Full name of schema's record is `"name": "record",` but the `record` is one
of avro complex type. See
https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L399
So, I wonder if I can set or change the name of avro record to avoid this
parse exception.
Thanks,
Youngwoo