Arvid, I found a jira related to my issue. https://issues.apache.org/jira/browse/FLINK-18096 Added a comment and I think Seth's idea is way better than just renaming the current name of the record from avro schema.
Thanks, Youngwoo On Mon, May 17, 2021 at 8:37 PM Youngwoo Kim (김영우) <warwit...@gmail.com> wrote: > Hey Arvid, > > I found that It's a constant from Flink. > https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L307 > I believe, it would be good to substitute 'record' to 'Record' > > What do you think? > > Thanks, > Youngwoo > > > On Mon, May 17, 2021 at 8:10 PM Arvid Heise <ar...@apache.org> wrote: > >> Hi Youngwoo, >> >> You can try to use aliases for it [1]. >> >> Even better would be to use a different name for the record. In general, >> since Avro originally comes from the Java World, it's more common to use >> camel case for record names. >> >> [1] https://avro.apache.org/docs/current/spec.html#Aliases >> >> On Mon, May 17, 2021 at 11:29 AM Youngwoo Kim (김영우) <yw...@apache.org> >> wrote: >> >>> Hi, >>> >>> I have a table backed by confluent avro format and the generated schema >>> from flink looks like following: >>> >>> { >>> "type": "record", >>> "name": "record", >>> "fields": [ >>> { >>> "name": "dt", >>> "type": [ >>> "null", >>> { >>> "type": "int", >>> "logicalType": "date" >>> } >>> ], >>> "default": null >>> }, >>> >>> (snip) >>> >>> } >>> >>> At this moment, I have another application that reads avro schema from >>> schema registry. Unfortunately, Got a traceback from the application: >>> >>> Traceback (most recent call last): >>> >>> File "/usr/local/bin/datahub", line 8, in <module> >>> >>> sys.exit(datahub()) >>> >>> File "/usr/local/lib/python3.8/site-packages/click/core.py", line >>> 829, in __call__ >>> >>> return self.main(*args, **kwargs) >>> >>> File "/usr/local/lib/python3.8/site-packages/click/core.py", line >>> 782, in main >>> >>> rv = self.invoke(ctx) >>> >>> File "/usr/local/lib/python3.8/site-packages/click/core.py", line >>> 1259, in invoke >>> >>> return _process_result(sub_ctx.command.invoke(sub_ctx)) >>> >>> File "/usr/local/lib/python3.8/site-packages/click/core.py", line >>> 1066, in invoke >>> >>> return ctx.invoke(self.callback, **ctx.params) >>> >>> File "/usr/local/lib/python3.8/site-packages/click/core.py", line >>> 610, in invoke >>> >>> return callback(*args, **kwargs) >>> >>> File "/usr/local/lib/python3.8/site-packages/datahub/entrypoints.py", >>> line 74, in ingest >>> >>> pipeline.run() >>> >>> File >>> "/usr/local/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", >>> line 108, in run >>> >>> for wu in self.source.get_workunits(): >>> >>> File >>> "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", >>> line 79, in get_workunits >>> >>> mce = self._extract_record(t) >>> >>> File >>> "/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", >>> line 112, in _extract_record >>> >>> fields = schema_util.avro_schema_to_mce_fields(schema.schema_str) >>> >>> File >>> "/usr/local/lib/python3.8/site-packages/datahub/ingestion/extractor/schema_util.py", >>> line 117, in avro_schema_to_mce_fields >>> >>> parsed_schema: avro.schema.Schema = >>> schema_parse_fn(avro_schema_string) >>> >>> File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line >>> 1244, in parse >>> >>> return SchemaFromJSONData(json_data, names) >>> >>> File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line >>> 1215, in SchemaFromJSONData >>> >>> return parser(json_data, names=names) >>> >>> File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line >>> 1136, in _SchemaFromJSONObject >>> >>> return RecordSchema( >>> >>> File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line >>> 1003, in __init__ >>> >>> super(RecordSchema, self).__init__( >>> >>> File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line >>> 440, in __init__ >>> >>> names.Register(self) >>> >>> File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line >>> 399, in Register >>> >>> raise SchemaParseException( >>> >>> avro.schema.SchemaParseException: record is a reserved type name. >>> >>> >>> >>> Full name of schema's record is `"name": "record",` but the `record` is >>> one of avro complex type. See >>> https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L399 >>> >>> >>> So, I wonder if I can set or change the name of avro record to avoid >>> this parse exception. >>> >>> >>> Thanks, >>> >>> Youngwoo >>> >>