hello guys,

update my question,

i made a test, code as below, i want to set a default value of address to
person

        SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("address")
                .field("province", SchemaBuilder.STRING_SCHEMA)
                .field("city", SchemaBuilder.STRING_SCHEMA);
        Struct defaultValue = new Struct(schemaBuilder.build())
                .put("province", "aaaa")
                .put("city", "bbbb");
        Schema dataSchema = SchemaBuilder.struct().name("person")
                .field("address",
schemaBuilder.defaultValue(defaultValue).build()).build();
        Struct struct = new Struct(dataSchema);
        System.out.println(struct.toString());

i got exception as below

Exception in thread "main"
org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default
value
at
org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)

Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do
not match.
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
at
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at
org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 1 more

i digged code of ConnectSchema.validateValue and found when type is STRUCT,
then will check class of schema, but one is  SchemaBuilder, another
is ConnectSchema

         case STRUCT:
                Struct struct = (Struct) value;
                if (!struct.schema().equals(schema))
                    throw new DataException("Struct schemas do not match.");
                struct.validate();
                break;

the method of equals is

@Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        ConnectSchema schema = (ConnectSchema) o;
        return Objects.equals(optional, schema.optional) &&
                Objects.equals(version, schema.version) &&
                Objects.equals(name, schema.name) &&
                Objects.equals(doc, schema.doc) &&
                Objects.equals(type, schema.type) &&
                Objects.deepEquals(defaultValue, schema.defaultValue) &&
                Objects.equals(fields, schema.fields) &&
                Objects.equals(keySchema, schema.keySchema) &&
                Objects.equals(valueSchema, schema.valueSchema) &&
                Objects.equals(parameters, schema.parameters);
    }

can anyone help how to set default value of "STRUCT" type with connect api?

Thanks

Best,
Lisheng


Lisheng Wang <wanglishen...@gmail.com> 于2020年1月6日周一 下午3:31写道:

> hello kafka devs
>
> i'm facing a problem that how to set a default value of struct.
>
> i'm following https://docs.confluent.io/current/connect/devguide.html
>
> Schema schema = SchemaBuilder.struct().name(NAME)
>     .field("name", Schema.STRING_SCHEMA)
>     .field("age", Schema.INT_SCHEMA)
>     .field("admin", new
> SchemaBuilder.boolean().defaultValue(false).build())
>     .build();
>
> Struct struct = new Struct(schema)
>     .put("name", "Barbara Liskov")
>     .put("age", 75)
>     .build();
>
> below is my code, i dont know how to set default value when schema type is
> struct
>
> Schema schema1 = SchemaBuilder.struct().name("info")
>                 .field("address", Schema.STRING_SCHEMA)
>                 .field("code",
> Schema.STRING_SCHEMA).defaultValue("").build();
>
> Thanks!
>
>
>
>
>
> Best,
> Lisheng
>

Reply via email to