我使用注册kafka topic对应的schema到confluent schema registry时报错,想知道问题的原因是什么?如何fix?
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Schema being registered is incompatible with an earlier schema for subject
"rtdp_test-test_schema-value", details:
[{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field
'salary' at path '/fields/10/type/fields/5' in the new schema has no default
value and is missing in the old schema', additionalInfo:'salary'},
{errorType:'READER_FIELD_MISSING_DEFAULT_VALUE', description:'The field
'salary' at path '/fields/11/type/fields/5' in the new schema has no default
value and is missing in the old schema', additionalInfo:'salary'},
{oldSchemaVersion: 4}, {oldSchema:
'{"type":"record","name":"Envelope","namespace":"rtdp_test-test_schema","fields":[{"name":"database","type":"string"},{"name":"es","type":"long"},{"name":"id","type":"int"},{"name":"isDdl","type":"boolean"},{"name":"sql","type":"string"},{"name":"table","type":"string"},{"name":"ts","type":"long"},{"name":"type","type":"string"},{"name":"pkNames","type":{"type":"array","items":"string"}},{"name":"data","type":[{"type":"array","items":{"type":"record","name":"Value","fields":[{"name":"id","type":["long","null"],"default":0},{"name":"create_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"update_time","type":{"type":"long","logicalType":"timestamp-millis"},"default":0},{"name":"name","type":["string","null"],"default":""},{"name":"gender","type":["string","null"],"default":""}]}},"null"]},{"name":"mysqlType","type":{"type":"record","name":"mysqlType","fields":[{"name":"id","type":"string"},{"name":"create_time","type":"string"},{"name":"update_time","type":"string"},{"name":"name","type":"string"},{"name":"gender","type":"string"}]}},{"name":"sqlType","type":{"type":"record","name":"sqlType","fields":[{"name":"id","type":"int"},{"name":"create_time","type":"int"},{"name":"update_time","type":"int"},{"name":"name","type":"int"},{"name":"gender","type":"int"}]}},{"name":"old","type":[{"type":"array","items":"Value"},"null"]}]}'},
{validateFields: 'false', compatibility: 'BACKWARD'}]; error code: 409
at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
maven依赖:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.3.1</version>
</dependency>
java代码:
Schema decimalSchema = LogicalTypes.decimal(precision,
scale).addToSchema(SchemaBuilder.builder().bytesType());
data =
data.name(columnName).type().unionOf().nullType().and().type(decimalSchema).endUnion().nullDefault();
salary字段是decimal类型,报错是说之前有一个不带salary字段版本的schema,而新版本schema里该salary字段定义中缺少default
value,可我明明设置了nullDefault呀,这一点从生成的avro schema json string也可验证:
{
"type": "record",
"name": "Envelope",
"namespace": "rtdp_test-test_schema",
"fields": [
{
"name": "database",
"type": "string"
},
{
"name": "es",
"type": "long"
},
{
"name": "id",
"type": "int"
},
{
"name": "isDdl",
"type": "boolean"
},
{
"name": "sql",
"type": "string"
},
{
"name": "table",
"type": "string"
},
{
"name": "ts",
"type": "long"
},
{
"name": "type",
"type": "string"
},
{
"name": "pkNames",
"type": {
"type": "array",
"items": "string"
}
},
{
"name": "data",
"type": [
{
"type": "array",
"items": {
"type": "record",
"name": "Value",
"fields": [
{
"name": "id",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "create_time",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
},
{
"name": "update_time",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
},
{
"name": "name",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "gender",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "salary",
"type": [
"null",
{
"type": "bytes",
"logicalType": "decimal",
"precision": 6,
"scale": 2
}
],
"default": null
}
]
}
},
"null"
]
},
{
"name": "mysqlType",
"type": {
"type": "record",
"name": "mysqlType",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "create_time",
"type": "string"
},
{
"name": "update_time",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "gender",
"type": "string"
},
{
"name": "salary",
"type": "string"
}
]
}
},
{
"name": "sqlType",
"type": {
"type": "record",
"name": "sqlType",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "create_time",
"type": "int"
},
{
"name": "update_time",
"type": "int"
},
{
"name": "name",
"type": "int"
},
{
"name": "gender",
"type": "int"
},
{
"name": "salary",
"type": "int"
}
]
}
},
{
"name": "old",
"type": [
{
"type": "array",
"items": "Value"
},
"null"
]
}
]
}