This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 45d71f8fcdd [improve][client] PIP-420: Supports users implement 
external schemas (#24488)
45d71f8fcdd is described below

commit 45d71f8fcdd58d8715ada7e418f3acd8b43c141a
Author: ran <[email protected]>
AuthorDate: Sun Aug 31 23:55:51 2025 +0800

    [improve][client] PIP-420: Supports users implement external schemas 
(#24488)
---
 conf/broker.conf                                   |   2 +-
 conf/standalone.conf                               |   2 +-
 pip/pip-420.md                                     |   6 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |   3 +-
 .../schema/ExternalSchemaCompatibilityCheck.java   |  50 ++++
 .../schema/validator/SchemaDataValidator.java      |   3 +-
 .../src/main/proto/SchemaRegistryFormat.proto      |   1 +
 .../ExternalSchemaCompatibilityCheckTest.java      |  90 ++++++++
 .../apache/pulsar/schema/ExternalSchemaTest.java   | 255 +++++++++++++++++++++
 .../pulsar/schema/MockExternalJsonSchema.java      | 113 +++++++++
 .../SchemaCompatibilityCheckTest.java              |  92 ++++++++
 .../org/apache/pulsar/client/api/EncodeData.java   |  41 ++++
 .../java/org/apache/pulsar/client/api/Message.java |   9 +
 .../java/org/apache/pulsar/client/api/Schema.java  |  17 ++
 .../client/api/SchemaSerializationException.java   |  16 ++
 .../org/apache/pulsar/common/schema/KeyValue.java  |  79 ++++++-
 .../apache/pulsar/common/schema/SchemaType.java    |  11 +
 .../client/impl/BatchMessageContainerImpl.java     |   4 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   7 +
 .../org/apache/pulsar/client/impl/MessageImpl.java |  39 ++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  15 +-
 .../pulsar/client/impl/TopicMessageImpl.java       |   5 +
 .../client/impl/TypedMessageBuilderImpl.java       |  25 +-
 .../client/impl/schema/KeyValueSchemaImpl.java     |  81 ++++++-
 .../client/impl/schema/KeyValueSchemaTest.java     |   2 +-
 .../SupportVersioningKeyValueSchemaTest.java       |   4 +-
 .../apache/pulsar/common/schema/KeyValueTest.java  |  29 +++
 .../client/impl/schema/KeyValueSchemaInfo.java     |   9 +-
 .../apache/pulsar/common/protocol/Commands.java    |   9 +
 pulsar-common/src/main/proto/PulsarApi.proto       |   2 +
 .../org/apache/pulsar/io/http/HttpSinkTest.java    |   5 +
 31 files changed, 994 insertions(+), 32 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index f56077b5006..11270ae081e 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -711,7 +711,7 @@ zookeeperSessionExpiredPolicy=reconnect
 systemTopicEnabled=true
 
 # Deploy the schema compatibility checker for a specific schema type to 
enforce schema compatibility check
-schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck
+schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ExternalSchemaCompatibilityCheck
 
 # The schema compatibility strategy is used for system topics.
 # Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, 
FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
diff --git a/conf/standalone.conf b/conf/standalone.conf
index c6be0376ed9..2b205a2b2f6 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -546,7 +546,7 @@ brokerClientTlsProtocols=
 systemTopicEnabled=true
 
 # Deploy the schema compatibility checker for a specific schema type to 
enforce schema compatibility check
-schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck
+schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ExternalSchemaCompatibilityCheck
 
 # The schema compatibility strategy is used for system topics.
 # Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD, 
FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
diff --git a/pip/pip-420.md b/pip/pip-420.md
index 2a033fa6f4e..1aa152ba8f1 100644
--- a/pip/pip-420.md
+++ b/pip/pip-420.md
@@ -173,7 +173,7 @@ Add a new field `schema_id` to the `MessageMetadata` to 
store the schema ID for
 ```protobuf
 // File `PulsarApi.proto`
 message MessageMetadata {
-    optional bytes schema_id = 31;
+    optional bytes schema_id = 32;
 }
 ```
 
@@ -214,9 +214,9 @@ public interface Message<T> {
    * PIP-420 provides a way to produce messages with external schema,
    * and the schema ID will be set to the message metadata.
    *
-   * @return schema ID of the message if the message is produced with external 
schema.
+   * @return the schema ID if the message is produced with external schema and 
schema ID is set, otherwise empty.
    */
-  byte[] getSchemaId();
+  Optional<byte[]> getSchemaId();
 
 }
 ```
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fd8dce934b9..279ed9ed73e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3268,7 +3268,8 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
             
"org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck",
             
"org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck",
-            
"org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck"
+            
"org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck",
+            
"org.apache.pulsar.broker.service.schema.ExternalSchemaCompatibilityCheck"
     );
 
     @FieldContext(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheck.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheck.java
new file mode 100644
index 00000000000..b5b5b9dece3
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheck.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+
+public class ExternalSchemaCompatibilityCheck implements 
SchemaCompatibilityCheck {
+
+    @Override
+    public SchemaType getSchemaType() {
+        return SchemaType.EXTERNAL;
+    }
+
+    @Override
+    public void checkCompatible(SchemaData from, SchemaData to, 
SchemaCompatibilityStrategy strategy)
+            throws IncompatibleSchemaException {
+        if ((SchemaType.EXTERNAL.equals(from.getType()) || 
SchemaType.EXTERNAL.equals(to.getType()))
+                && !from.getType().equals(to.getType())) {
+            throw new IncompatibleSchemaException("External schema is not 
compatible with the other schema types.");
+        }
+    }
+
+    @Override
+    public void checkCompatible(Iterable<SchemaData> from, SchemaData to, 
SchemaCompatibilityStrategy strategy)
+            throws IncompatibleSchemaException {
+        for (SchemaData fromSchema : from) {
+            checkCompatible(fromSchema, to, strategy);
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
index 85b73f53f81..a26cc4434b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
@@ -69,7 +69,8 @@ public interface SchemaDataValidator {
                 break;
             case NONE:
             case BYTES:
-                // `NONE` and `BYTES` schema is not stored
+            case EXTERNAL:
+                // `NONE`, `BYTES` and `EXTERNAL` schema is not stored
                 break;
             case AUTO:
             case AUTO_CONSUME:
diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto 
b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
index da024ddcad8..431deeaedf1 100644
--- a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
+++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
@@ -45,6 +45,7 @@ message SchemaInfo {
         LOCALTIME = 19;
         LOCALDATETIME = 20;
         PROTOBUFNATIVE = 21;
+        EXTERNAL = 22;
     }
     message KeyValuePair {
         required string key = 1;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheckTest.java
new file mode 100644
index 00000000000..9c0377d1bcb
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheckTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static org.testng.Assert.fail;
+import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ExternalSchemaCompatibilityCheckTest {
+
+    private final ExternalSchemaCompatibilityCheck compatibilityCheck = new 
ExternalSchemaCompatibilityCheck();
+
+    private final SchemaData externalSchemaData = SchemaData.builder()
+            .type(SchemaType.EXTERNAL)
+            .data(new byte[0])
+            .build();
+
+    @DataProvider(name = "otherSchemasProvider")
+    public Object[] otherSchemasProvider() {
+        return new Object[] {
+                SchemaData.builder()
+                        .type(SchemaType.JSON)
+                        .build(),
+                SchemaData.builder()
+                        .type(SchemaType.AVRO)
+                        .build(),
+                SchemaData.builder()
+                        .type(SchemaType.PROTOBUF)
+                        .build(),
+                SchemaData.builder()
+                        .type(SchemaType.PROTOBUF_NATIVE)
+                        .build()
+        };
+    }
+
+    @Test(dataProvider = "otherSchemasProvider")
+    public void testExternalSchemaCompatibilityCheck(SchemaData schemaData) {
+        try {
+            compatibilityCheck.checkCompatible(
+                    schemaData, externalSchemaData, 
SchemaCompatibilityStrategy.FULL);
+            fail("Expected IncompatibleSchemaException not thrown");
+        } catch (IncompatibleSchemaException e) {
+            // Expected exception, as external schema is not compatible with 
the other schemas
+        }
+
+        try {
+            compatibilityCheck.checkCompatible(
+                    externalSchemaData, schemaData, 
SchemaCompatibilityStrategy.FULL);
+            fail("Expected IncompatibleSchemaException not thrown");
+        } catch (IncompatibleSchemaException e) {
+            // Expected exception, as external schema is not compatible with 
the other schemas
+        }
+    }
+
+    @Test
+    public void testExternalSchemaData() {
+        try {
+            SchemaData exSchemaData = SchemaData.builder()
+                    .type(SchemaType.EXTERNAL)
+                    .data(new byte[0])
+                    .build();
+            compatibilityCheck.checkCompatible(
+                    exSchemaData, externalSchemaData, 
SchemaCompatibilityStrategy.FULL);
+        } catch (IncompatibleSchemaException e) {
+            fail("Did not expect IncompatibleSchemaException to be thrown");
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/ExternalSchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/ExternalSchemaTest.java
new file mode 100644
index 00000000000..652f9add920
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/ExternalSchemaTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.schema;
+
+import static 
org.apache.pulsar.schema.MockExternalJsonSchema.MOCK_KEY_SCHEMA_DATA;
+import static org.apache.pulsar.schema.MockExternalJsonSchema.MOCK_SCHEMA_DATA;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.fail;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test get partitioned topic schema.
+ */
+@Test(groups = "schema")
+public class ExternalSchemaTest extends MockedPulsarServiceBaseTest {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        isTcpLookup = true;
+        super.internalSetup();
+
+        admin.clusters().createCluster("test",
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("public",
+                new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("public/default");
+        admin.namespaces().setNamespaceReplicationClusters("public/default", 
Sets.newHashSet("test"));
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 1000 * 5)
+    public void testMockExternalSchema() throws Exception {
+        final String topic = "external-schema-topic";
+        MockExternalJsonSchema<Schemas.PersonFour> externalJsonSchema =
+                new MockExternalJsonSchema<>(Schemas.PersonFour.class);
+        @Cleanup
+        Consumer<Schemas.PersonFour> consumer = 
pulsarClient.newConsumer(externalJsonSchema)
+                .topic(topic)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("test")
+                .subscribe();
+
+        @Cleanup
+        Producer<Schemas.PersonFour> producer = 
pulsarClient.newProducer(externalJsonSchema)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        int messageCount = 10;
+        for (int i = 0; i < messageCount; i++) {
+            Schemas.PersonFour person = new Schemas.PersonFour();
+            person.setId(i);
+            person.setName("user-" + i);
+            person.setAge(18);
+            producer.newMessage().value(person).send();
+        }
+
+        for (int i = 0; i < messageCount; i++) {
+            Message<Schemas.PersonFour> message = consumer.receive();
+            assertTrue(message.getSchemaId().isPresent());
+            assertEquals(message.getSchemaId().get(), 
MockExternalJsonSchema.MOCK_SCHEMA_ID);
+            assertEquals(message.getData(), MOCK_SCHEMA_DATA);
+            assertNull(message.getValue());
+            Assert.assertNotNull(message);
+        }
+    }
+
+    @Test(timeOut = 1000 * 5)
+    public void testConflictKvSchema() throws Exception {
+        var externalJsonSchema = new 
MockExternalJsonSchema<>(Schemas.PersonFour.class);
+        try {
+            Schema.KeyValue(externalJsonSchema, 
Schema.JSON(Schemas.PersonFour.class), KeyValueEncodingType.SEPARATED);
+            fail("should fail");
+        } catch (IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("External schema cannot be used 
with other Pulsar struct schema types"));
+        }
+
+        try {
+            Schema.KeyValue(Schema.JSON(Schemas.PersonFour.class), 
externalJsonSchema);
+            fail("should fail");
+        } catch (IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("External schema cannot be used 
with other Pulsar struct schema types"));
+        }
+
+        Schema.KeyValue(Schema.JSON(Schemas.PersonFour.class), Schema.STRING);
+        Schema.KeyValue(Schema.STRING, Schema.JSON(Schemas.PersonFour.class));
+    }
+
+    @DataProvider(name = "provideKeyValueEncodingType")
+    public Object[][] provideKeyValueEncodingType() {
+        return new Object[][]{
+                {KeyValueEncodingType.SEPARATED},
+                {KeyValueEncodingType.INLINE}
+        };
+    }
+
+    @Test(dataProvider = "provideKeyValueEncodingType", timeOut = 1000 * 5)
+    public void testExternalKeyValueSchema(KeyValueEncodingType encodingType) 
throws Exception {
+        var keySchema = new MockExternalJsonSchema<>(Schemas.PersonFour.class, 
true);
+        var valueSchema = new 
MockExternalJsonSchema<>(Schemas.PersonFour.class);
+        var keyValueSchema = Schema.KeyValue(keySchema, valueSchema, 
encodingType);
+
+        final String topic = "testExternalKeyValueSchema-" + 
encodingType.name();
+        @Cleanup
+        Consumer<KeyValue<Schemas.PersonFour, Schemas.PersonFour>> consumer = 
pulsarClient.newConsumer(keyValueSchema)
+                .topic(topic)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("test")
+                .subscribe();
+
+        @Cleanup
+        Producer<KeyValue<Schemas.PersonFour, Schemas.PersonFour>> producer = 
pulsarClient.newProducer(keyValueSchema)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        int messageCount = 10;
+        for (int i = 0; i < messageCount; i++) {
+            var person = new Schemas.PersonFour();
+            person.setId(i);
+            person.setName("user-" + i);
+            person.setAge(18);
+            producer.newMessage().value(new KeyValue<>(person, person)).send();
+        }
+
+        for (int i = 0; i < messageCount; i++) {
+            var message = consumer.receive();
+            assertTrue(message.getSchemaId().isPresent());
+            assertEquals(message.getSchemaId().get(), 
KeyValue.generateKVSchemaId(
+                    MockExternalJsonSchema.MOCK_KEY_SCHEMA_ID, 
MockExternalJsonSchema.MOCK_SCHEMA_ID));
+
+            if (KeyValueEncodingType.INLINE.equals(encodingType)) {
+                ByteBuf buf = Unpooled.wrappedBuffer(message.getData());
+                assertEquals(buf.readInt(), MOCK_KEY_SCHEMA_DATA.length);
+                byte[] data = new byte[MOCK_KEY_SCHEMA_DATA.length];
+                buf.readBytes(data);
+                assertEquals(data, MOCK_KEY_SCHEMA_DATA);
+                assertEquals(buf.readInt(), MOCK_SCHEMA_DATA.length);
+                data = new byte[MOCK_SCHEMA_DATA.length];
+                buf.readBytes(data);
+                assertEquals(data, MOCK_SCHEMA_DATA);
+                assertEquals(buf.readableBytes(), 0);
+            } else {
+                assertEquals(message.getData(), MOCK_SCHEMA_DATA);
+                assertEquals(message.getKeyBytes(), MOCK_KEY_SCHEMA_DATA);
+            }
+
+            assertNotNull(message.getValue());
+            assertNull(message.getValue().getKey());
+            assertNull(message.getValue().getValue());
+        }
+    }
+
+    @Test(dataProvider = "provideKeyValueEncodingType", timeOut = 1000 * 5)
+    public void testExternalSchemaWithPrimitiveSchema(KeyValueEncodingType 
encodingType) throws Exception {
+        var keySchema = Schema.STRING;
+        var valueSchema = new 
MockExternalJsonSchema<>(Schemas.PersonFour.class);
+        var keyValueSchema = Schema.KeyValue(keySchema, valueSchema, 
encodingType);
+
+        final String topic = "testExternalSchemaWithPrimitiveSchema-" + 
encodingType.name();
+        @Cleanup
+        Consumer<KeyValue<String, Schemas.PersonFour>> consumer = 
pulsarClient.newConsumer(keyValueSchema)
+                .topic(topic)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("test")
+                .subscribe();
+
+        @Cleanup
+        Producer<KeyValue<String, Schemas.PersonFour>> producer = 
pulsarClient.newProducer(keyValueSchema)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        int messageCount = 10;
+        for (int i = 0; i < messageCount; i++) {
+            var person = new Schemas.PersonFour();
+            person.setId(i);
+            person.setName("user-" + i);
+            person.setAge(18);
+            producer.newMessage().value(new KeyValue<>("index-" + i, 
person)).send();
+        }
+
+        for (int i = 0; i < messageCount; i++) {
+            var message = consumer.receive();
+            assertTrue(message.getSchemaId().isPresent());
+            assertEquals(message.getSchemaId().get(), 
KeyValue.generateKVSchemaId(
+                    new byte[0], MockExternalJsonSchema.MOCK_SCHEMA_ID));
+            var keyBytes = ("index-" + i).getBytes();
+
+            if (KeyValueEncodingType.INLINE.equals(encodingType)) {
+                ByteBuf buf = Unpooled.wrappedBuffer(message.getData());
+                assertEquals(buf.readInt(), keyBytes.length);
+                byte[] data = new byte[keyBytes.length];
+                buf.readBytes(data);
+                assertEquals(data, keyBytes);
+                assertEquals(buf.readInt(), MOCK_SCHEMA_DATA.length);
+                data = new byte[MOCK_SCHEMA_DATA.length];
+                buf.readBytes(data);
+                assertEquals(data, MOCK_SCHEMA_DATA);
+                assertEquals(buf.readableBytes(), 0);
+            } else {
+                assertEquals(message.getData(), MOCK_SCHEMA_DATA);
+                assertEquals(message.getKeyBytes(), keyBytes);
+            }
+
+            assertNotNull(message.getValue());
+            assertEquals(message.getValue().getKey(), "index-" + i);
+            assertNull(message.getValue().getValue());
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/MockExternalJsonSchema.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/MockExternalJsonSchema.java
new file mode 100644
index 00000000000..69c74d27ced
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/MockExternalJsonSchema.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.schema;
+
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import org.apache.pulsar.client.api.EncodeData;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+public class MockExternalJsonSchema<T> implements Schema<T> {
+
+    public static final byte[] MOCK_SCHEMA_DATA = new byte[] {1, 2, 3, 4, 5};
+    public static final byte[] MOCK_KEY_SCHEMA_DATA = new byte[] {0, 4, 3, 4, 
8};
+    public static final byte[] MOCK_SCHEMA_ID = new byte[] {2, 3, 7, 8};
+    public static final byte[] MOCK_KEY_SCHEMA_ID = new byte[] {5, 6, 7, 8, 0, 
0, 2};
+
+    private final Class<T> clazz;
+    private final boolean isKey;
+    @Getter
+    private boolean isClosed;
+
+    public MockExternalJsonSchema(Class<T> clazz, boolean isKey) {
+        this.clazz = clazz;
+        this.isKey = isKey;
+    }
+
+    public MockExternalJsonSchema(Class<T> clazz) {
+        this.clazz = clazz;
+        this.isKey = false;
+    }
+
+    @Override
+    public EncodeData encode(String topic, T message) {
+        // the external schema should register schema when encoding the 
message, this is just a mock implementation
+        return new EncodeData(
+                isKey ? MOCK_KEY_SCHEMA_DATA : MOCK_SCHEMA_DATA,
+                isKey ? MOCK_KEY_SCHEMA_ID : MOCK_SCHEMA_ID);
+    }
+
+    @Override
+    public T decode(String topic, ByteBuffer data, byte[] schemaId) {
+        return decode(topic, 
ByteBufUtil.getBytes(Unpooled.wrappedBuffer(data)), schemaId);
+    }
+
+    @Override
+    public T decode(String topic, byte[] data, byte[] schemaId) {
+        byte[] expectedSchemaId = isKey ? MOCK_KEY_SCHEMA_ID : MOCK_SCHEMA_ID;
+        if (!Arrays.equals(schemaId, expectedSchemaId)) {
+            throw new IllegalStateException("Unexpected schema id");
+        }
+        byte[] expectedSchemaData = isKey ? MOCK_KEY_SCHEMA_DATA : 
MOCK_SCHEMA_DATA;
+        if (!Arrays.equals(data, expectedSchemaData)) {
+            throw new IllegalStateException("Unexpected schema data");
+        }
+        // the external schema should retrieve the schema and decoding the 
payload, this is just a mock implementation
+        return null;
+    }
+
+    @Override
+    public byte[] encode(T message) {
+        // the external schema doesn't support this method
+        throw new UnsupportedOperationException("Not supported");
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SchemaInfoImpl.builder()
+                .name("")
+                .type(SchemaType.EXTERNAL)
+                .schema(new byte[0])
+                .build();
+    }
+
+    @Override
+    public boolean supportSchemaVersioning() {
+        return true;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        this.isClosed = true;
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public Schema<T> clone() {
+        return new MockExternalJsonSchema<T>(clazz, isKey);
+    }
+
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index b18e2c179b3..b59ae89c0bf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -22,10 +22,13 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
@@ -37,6 +40,8 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -50,6 +55,7 @@ import 
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.schema.MockExternalJsonSchema;
 import org.apache.pulsar.schema.Schemas;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -630,6 +636,92 @@ public class SchemaCompatibilityCheckTest extends 
MockedPulsarServiceBaseTest {
         }
 
     }
+
+    @Test
+    public void testExternalSchemaTypeCompatibility() throws Exception {
+        String namespace = "test-namespace-" + randomName(16);
+        admin.namespaces().createNamespace(
+                PUBLIC_TENANT + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME)
+        );
+
+        NamespaceName namespaceName = NamespaceName.get(PUBLIC_TENANT, 
namespace);
+        
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), 
SchemaCompatibilityStrategy.FULL);
+
+        final String topic = "persistent://" + PUBLIC_TENANT + "/" + namespace 
+ "/testExternalSchemaTypeCompatibility";
+
+        MockExternalJsonSchema<Schemas.PersonThree> externalJsonSchema =
+                new MockExternalJsonSchema<>(Schemas.PersonThree.class);
+
+        Map<String, String> schemaConfigs = new HashMap<>();
+        schemaConfigs.put("schema.registry.url", "http://localhost:8080";);
+
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(lookupUrl.toString())
+                .build();
+
+        // Existing topic schema is JSON, new schema can't be EXTERNAL
+        Producer<Schemas.PersonThree> producer = client
+                .newProducer(Schema.JSON(Schemas.PersonThree.class))
+                .topic(topic)
+                .create();
+        producer.close();
+
+        try (Producer<Schemas.PersonThree> ignored = client
+                .newProducer(externalJsonSchema)
+                .topic(topic)
+                .create()) {
+            fail("Should not be able to create producer with incompatible 
schema.");
+        } catch (Exception e) {
+            assertTrue(e instanceof 
PulsarClientException.IncompatibleSchemaException);
+            assertTrue(e.getMessage().contains(
+                    "Incompatible schema: exists schema type JSON, new schema 
type EXTERNAL"));
+        }
+        try (Consumer<Schemas.PersonThree> ignored = client
+                .newConsumer(externalJsonSchema)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe()) {
+            fail("Should not be able to create consumer with incompatible 
schema.");
+        } catch (Exception e) {
+            assertTrue(e instanceof 
PulsarClientException.IncompatibleSchemaException);
+            assertTrue(e.getMessage().contains(
+                    "Incompatible schema: exists schema type JSON, new schema 
type EXTERNAL"));
+        }
+        admin.topics().delete(topic);
+
+        producer = client
+                .newProducer(externalJsonSchema)
+                .topic(topic)
+                .create();
+        assertFalse(externalJsonSchema.isClosed());
+        producer.close();
+        assertTrue(externalJsonSchema.isClosed());
+
+        try (Producer<Schemas.PersonThree> ignored = client
+                .newProducer(Schema.JSON(Schemas.PersonThree.class))
+                .topic(topic)
+                .create()) {
+            fail("Should not be able to create producer with incompatible 
schema.");
+        } catch (Exception e) {
+            assertTrue(e instanceof 
PulsarClientException.IncompatibleSchemaException);
+            assertTrue(e.getMessage().contains(
+                    "Incompatible schema: exists schema type EXTERNAL, new 
schema type JSON"));
+        }
+        try (Consumer<Schemas.PersonThree> ignored = client
+                .newConsumer(Schema.JSON(Schemas.PersonThree.class))
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe()) {
+            fail("Should not be able to create consumer with incompatible 
schema.");
+        } catch (Exception e) {
+            assertTrue(e instanceof 
PulsarClientException.IncompatibleSchemaException);
+            assertTrue(e.getMessage().contains(
+                    "Incompatible schema: exists schema type EXTERNAL, new 
schema type JSON"));
+        }
+        admin.topics().delete(topic);
+    }
+
     public static String randomName(int numChars) {
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < numChars; i++) {
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EncodeData.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EncodeData.java
new file mode 100644
index 00000000000..92c35fe7b04
--- /dev/null
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EncodeData.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Data to be encoded by an external schema.
+ *
+ * @param data the message payload to be encoded
+ * @param schemaId the schema id return by the schema registry, it can be null 
if not applicable
+ */
+public record EncodeData(byte[] data, byte[] schemaId) {
+
+    public EncodeData(byte[] data) {
+        this(data, null);
+    }
+
+    public boolean hasSchemaId() {
+        return isValidSchemaId(schemaId);
+    }
+
+    public static boolean isValidSchemaId(byte[] schemaId) {
+        return schemaId != null && schemaId.length > 0;
+    }
+
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
index 3be6a06153b..d76618b1e1d 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
@@ -208,6 +208,15 @@ public interface Message<T> {
      */
     byte[] getSchemaVersion();
 
+    /**
+     * Get schema ID of the message.
+     * PIP-420 provides a way to produce messages with external schema,
+     * and the schema ID will be set to the message metadata.
+     *
+     * @return the schema ID if the message is produced with external schema 
and schema ID is set, otherwise empty.
+     */
+    Optional<byte[]> getSchemaId();
+
     /**
      * Get the schema associated to the message.
      * Please note that this schema is usually equal to the Schema you passed
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index ef1e0cc1fea..e4ed2a1303d 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -28,6 +28,7 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.Date;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
@@ -74,6 +75,10 @@ public interface Schema<T> extends Cloneable {
      */
     byte[] encode(T message);
 
+    default EncodeData encode(String topic, T message) {
+        return new EncodeData(encode(message));
+    }
+
     /**
      * Returns whether this schema supports versioning.
      *
@@ -134,6 +139,14 @@ public interface Schema<T> extends Cloneable {
         return decode(getBytes(data));
     }
 
+    default T decode(String topic, ByteBuffer data, byte[] schemaId) {
+        return decode(data, schemaId);
+    }
+
+    default T decode(String topic, byte[] data, byte[] schemaId) {
+        return decode(data, schemaId);
+    }
+
     /**
      * Decode a ByteBuffer into an object using a given version. <br/>
      *
@@ -184,6 +197,10 @@ public interface Schema<T> extends Cloneable {
      */
     Schema<T> clone();
 
+    default CompletableFuture<Void> closeAsync() {
+        return CompletableFuture.completedFuture(null);
+    }
+
     /**
      * Schema that doesn't perform any encoding on the message payloads. 
Accepts a byte array and it passes it through.
      */
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
index 0c9b1cd21dd..dfb79866212 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
@@ -50,4 +50,20 @@ public class SchemaSerializationException extends 
RuntimeException {
     public SchemaSerializationException(Throwable cause) {
         super(cause);
     }
+
+    /**
+     * Constructs an {@code SchemaSerializationException} with the specified 
detail message and cause.
+     *
+     * @param message
+     *        The detail message (which is saved for later retrieval
+     *        by the {@link #getMessage()} method)
+     * @param cause
+     *        The cause (which is saved for later retrieval by the
+     *        {@link #getCause()} method).  (A null value is permitted,
+     *        and indicates that the cause is nonexistent or unknown.)
+     */
+    public SchemaSerializationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
index 489efc5825a..98725410af9 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pulsar.common.schema;
 
+import static org.apache.pulsar.client.api.EncodeData.isValidSchemaId;
 import java.nio.ByteBuffer;
 import java.util.Objects;
+import org.apache.pulsar.client.api.EncodeData;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
@@ -100,26 +102,79 @@ public class KeyValue<K, V> {
      */
     public static <K, V> byte[] encode(K key, Schema<K> keyWriter,
                                        V value, Schema<V> valueWriter) {
-        byte [] keyBytes;
+        return encode(null, key, keyWriter, value, valueWriter).data();
+    }
+
+    public static <K, V> EncodeData encode(String topic, K key, Schema<K> 
keyWriter,
+                                           V value, Schema<V> valueWriter) {
+        EncodeData keyEncodeData;
         if (key == null) {
-            keyBytes = new byte[0];
+            keyEncodeData = new EncodeData(new byte[0]);
         } else {
-            keyBytes = keyWriter.encode(key);
+            keyEncodeData = keyWriter.encode(topic, key);
         }
 
-        byte [] valueBytes;
+        EncodeData valueEncodeData;
         if (value == null) {
-            valueBytes = new byte[0];
+            valueEncodeData = new EncodeData(new byte[0]);
         } else {
-            valueBytes = valueWriter.encode(value);
+            valueEncodeData = valueWriter.encode(topic, value);
         }
-        ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + 
valueBytes.length);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(
+                4 + keyEncodeData.data().length + 4 + 
valueEncodeData.data().length);
         byteBuffer
-            .putInt(key == null ? -1 : keyBytes.length)
-            .put(keyBytes)
-            .putInt(value == null ? -1 : valueBytes.length)
-            .put(valueBytes);
-        return byteBuffer.array();
+            .putInt(key == null ? -1 : keyEncodeData.data().length)
+            .put(keyEncodeData.data())
+            .putInt(value == null ? -1 : valueEncodeData.data().length)
+            .put(valueEncodeData.data());
+        return new EncodeData(byteBuffer.array(),
+                generateKVSchemaId(keyEncodeData.schemaId(), 
valueEncodeData.schemaId()));
+    }
+
+    /**
+     * Generate a combined schema id for key/value schema.
+     * The format is:
+     * schemaId = schemaKeyLength + keySchemaIdBytes + schemaValueLength + 
valueSchemaIdBytes
+     * where schemaKeyLength and schemaValueLength are 4 bytes integer.
+     * If keySchemaIdBytes or valueSchemaIdBytes is null, the length will be 0.
+     * So the total length of schemaId is:
+     * 4 + keySchemaIdBytes.length + 4 + valueSchemaIdBytes.length
+     *
+     * @param keySchemaId the schema id of key schema
+     * @param valueSchemaId the schema id of value schema
+     */
+    public static byte[] generateKVSchemaId(byte[] keySchemaId, byte[] 
valueSchemaId) {
+        if (!isValidSchemaId(keySchemaId) && !isValidSchemaId(valueSchemaId)) {
+            return null;
+        }
+        keySchemaId = keySchemaId == null ? new byte[0] : keySchemaId;
+        valueSchemaId = valueSchemaId == null ? new byte[0] : valueSchemaId;
+        ByteBuffer buffer = ByteBuffer.allocate(
+                4 + keySchemaId.length + 4 + valueSchemaId.length);
+        buffer
+                .putInt(keySchemaId.length)
+                .put(keySchemaId)
+                .putInt(valueSchemaId.length)
+                .put(valueSchemaId);
+        return buffer.array();
+    }
+
+    public static KeyValue<byte[], byte[]> getSchemaId(byte[] schemaId) {
+        ByteBuffer buffer = ByteBuffer.wrap(schemaId);
+        int keySchemaLength = buffer.getInt();
+        byte[] keySchemaId = new byte[0];
+        if (keySchemaLength > 0) {
+            keySchemaId = new byte[keySchemaLength];
+            buffer.get(keySchemaId);
+        }
+
+        int valueSchemaLength = buffer.getInt();
+        byte[] valueSchemaId = new byte[0];
+        if (valueSchemaLength > 0) {
+            valueSchemaId = new byte[valueSchemaLength];
+            buffer.get(valueSchemaId);
+        }
+        return new KeyValue<>(keySchemaId, valueSchemaId);
     }
 
     /**
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index ac87485abad..af674a85aa5 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -144,6 +144,16 @@ public enum SchemaType {
      */
     PROTOBUF_NATIVE(20),
 
+    /**
+     * External Schema Type.
+     * <p>
+     * This is used to indicate that the schema is managed externally, such as 
in a schema registry.
+     *
+     * External schema type is not compatible with any other schema type.
+     * </p>
+     */
+    EXTERNAL(21),
+
     //
     // Schemas that don't have schema info. the value should be negative.
     //
@@ -202,6 +212,7 @@ public enum SchemaType {
           case 18: return LOCAL_TIME;
           case 19: return LOCAL_DATE_TIME;
           case 20: return PROTOBUF_NATIVE;
+          case 21: return EXTERNAL;
           case -1: return BYTES;
           case -2: return AUTO;
           case -3: return AUTO_CONSUME;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 489a3752332..57946bd7a69 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -366,6 +366,10 @@ class BatchMessageContainerImpl extends 
AbstractBatchMessageContainer {
         if (numMessagesInBatch == 0) {
             return true;
         }
+        if (messageMetadata.hasSchemaId() && msg.getSchemaId().isPresent()) {
+            return Arrays.equals(msg.getSchemaId().get(), 
messageMetadata.getSchemaId())
+                    && Arrays.equals(msg.getSchemaVersion(), 
messageMetadata.getSchemaVersion());
+        }
         if (!messageMetadata.hasSchemaVersion()) {
             return msg.getSchemaVersion() == null;
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e44664491d0..6d5305136b1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1170,6 +1170,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 }
             }));
         }
+        if (schema != null) {
+            closeFutures.add(schema.closeAsync().whenComplete((ignore, ex) -> {
+                if (ex != null) {
+                    log.warn("Exception ignored in closing schema of 
consumer", ex);
+                }
+            }));
+        }
         CompletableFuture<Void> compositeCloseFuture = 
FutureUtil.waitForAll(closeFutures);
 
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index aa98df6cda9..2d27a2ed2c8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -424,6 +424,15 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+    @Override
+    public Optional<byte[]> getSchemaId() {
+        if (msgMetadata.hasSchemaId()) {
+            byte[] schemaId = msgMetadata.getSchemaId();
+            return (schemaId.length == 0) ? Optional.empty() : 
Optional.of(schemaId);
+        }
+        return Optional.empty();
+    }
+
     private void ensureSchemaIsLoaded() {
         if (schema instanceof AutoConsumeSchema) {
             ((AutoConsumeSchema) 
schema).fetchSchemaIfNeeded(BytesSchemaVersion.of(getSchemaVersion()));
@@ -464,7 +473,11 @@ public class MessageImpl<T> implements Message<T> {
     @Override
     public T getValue() {
         SchemaInfo schemaInfo = getSchemaInfo();
+        var schemaIdOp = getSchemaId();
         if (schemaInfo != null && SchemaType.KEY_VALUE == 
schemaInfo.getType()) {
+            if (schemaIdOp.isPresent()) {
+                return getKeyValueBySchemaId(schemaIdOp.get());
+            }
             if (schema.supportSchemaVersioning()) {
                 return getKeyValueBySchemaVersion();
             } else {
@@ -474,6 +487,9 @@ public class MessageImpl<T> implements Message<T> {
             if (msgMetadata.isNullValue()) {
                 return null;
             }
+            if (schemaIdOp.isPresent()) {
+                return decodeBySchemaId(schemaIdOp.get());
+            }
             // check if the schema passed in from client supports schema 
versioning or not
             // this is an optimization to only get schema version when 
necessary
             return decode(schema.supportSchemaVersioning() ? 
getSchemaVersion() : null);
@@ -514,6 +530,15 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+    private T decodeBySchemaId(byte[] schemaId) {
+        try {
+            return schema.decode(topic, getByteBuffer(), schemaId);
+        } catch (Exception e) {
+            throw new SchemaSerializationException("Failed to decode message 
from topic " + topic
+                    + " with schemaId " + 
Base64.getEncoder().encodeToString(schemaId), e);
+        }
+    }
+
     private ByteBuffer getByteBuffer() {
         if (msgMetadata.isNullValue()) {
             return null;
@@ -538,6 +563,20 @@ public class MessageImpl<T> implements Message<T> {
         }
     }
 
+    private T getKeyValueBySchemaId(byte[] schemaId) {
+        if (schema instanceof AutoConsumeSchema) {
+            throw new UnsupportedOperationException("AutoConsumeSchema is not 
supported with schemaId");
+        }
+        if (!(schema instanceof KeyValueSchemaImpl<?, ?> kvSchema)) {
+            throw new IllegalStateException("The schema is not a 
KeyValueSchema");
+        }
+        if (kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
+            return (T) kvSchema.decode(topic, getKeyBytes(), getData(), 
schemaId);
+        } else {
+            return decodeBySchemaId(schemaId);
+        }
+    }
+
     private T getKeyValue() {
         KeyValueSchemaImpl kvSchema = getKeyValueSchema();
         if (kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a6987ca11f8..bf749ef2b34 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1203,6 +1203,18 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             return State.Closing;
         });
 
+        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        List<CompletableFuture<Void>> closeTasks = new ArrayList<>();
+        closeTasks.add(closeFuture);
+        if (schema != null) {
+            closeTasks.add(schema.closeAsync().whenComplete((__, t) -> {
+                if (t != null) {
+                    log.warn("Exception ignored in closing schema of 
producer", t);
+                }
+            }));
+        }
+        CompletableFuture<Void> compositeCloseFuture = 
FutureUtil.waitForAll(closeTasks);
+
         if (currentState == State.Closed || currentState == State.Closing) {
             return CompletableFuture.completedFuture(null);
         }
@@ -1220,7 +1232,6 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newCloseProducer(producerId, requestId);
 
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
             cnx.removeProducer(producerId);
             if (exception == null || !cnx.ctx().channel().isActive()) {
@@ -1236,7 +1247,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             return null;
         });
 
-        return closeFuture;
+        return compositeCloseFuture;
     }
 
     @VisibleForTesting
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 1fec08a43f1..7b9916b58fc 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -169,6 +169,11 @@ public class TopicMessageImpl<T> implements Message<T> {
         return msg.getSchemaVersion();
     }
 
+    @Override
+    public Optional<byte[]> getSchemaId() {
+        return msg.getSchemaId();
+    }
+
     @Override
     public boolean isReplicated() {
         return msg.isReplicated();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 8ef9079091a..e2bb4b0cf97 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.client.api.EncodeData.isValidSchemaId;
 import static org.apache.pulsar.client.util.TypeCheckUtil.checkType;
 import java.nio.ByteBuffer;
 import java.util.Base64;
@@ -28,6 +29,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.EncodeData;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -36,6 +38,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -77,7 +80,11 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
                     return null;
                 }
             }).orElseGet(() -> {
-                content = ByteBuffer.wrap(schema.encode(value));
+                EncodeData encodeData = schema.encode(producer.topic, value);
+                content = ByteBuffer.wrap(encodeData.data());
+                if (encodeData.hasSchemaId()) {
+                    msgMetadata.setSchemaId(encodeData.schemaId());
+                }
                 return this;
             });
         }
@@ -304,20 +311,30 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
         org.apache.pulsar.common.schema.KeyValue<K, V> keyValue =
                 (org.apache.pulsar.common.schema.KeyValue<K, V>) value;
 
+        EncodeData keyEncoded = null;
         // set key as the message key
         if (keyValue.getKey() != null) {
-            msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(
-                    keyValueSchema.getKeySchema().encode(keyValue.getKey())));
+            keyEncoded = keyValueSchema.getKeySchema().encode(producer.topic, 
keyValue.getKey());
+            
msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(keyEncoded.data()));
             msgMetadata.setPartitionKeyB64Encoded(true);
         } else {
             msgMetadata.setNullPartitionKey(true);
         }
 
+        EncodeData valueEncoded = null;
         // set value as the payload
         if (keyValue.getValue() != null) {
-            content = 
ByteBuffer.wrap(keyValueSchema.getValueSchema().encode(keyValue.getValue()));
+            valueEncoded = 
keyValueSchema.getValueSchema().encode(producer.topic, keyValue.getValue());
+            content = ByteBuffer.wrap(valueEncoded.data());
         } else {
             msgMetadata.setNullValue(true);
         }
+
+        byte[] schemaId = KeyValue.generateKVSchemaId(
+                keyEncoded != null && keyEncoded.hasSchemaId() ? 
keyEncoded.schemaId() : null,
+                valueEncoded != null && valueEncoded.hasSchemaId() ? 
valueEncoded.schemaId() : null);
+        if (isValidSchemaId(schemaId)) {
+            msgMetadata.setSchemaId(schemaId);
+        }
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
index b00370b414b..887ccc48756 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
@@ -19,13 +19,17 @@
 package org.apache.pulsar.client.impl.schema;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.client.api.EncodeData.isValidSchemaId;
+import static 
org.apache.pulsar.client.internal.PulsarClientImplementationBinding.getBytes;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.EncodeData;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
@@ -102,6 +106,22 @@ public class KeyValueSchemaImpl<K, V> extends 
AbstractSchema<KeyValue<K, V>> imp
     private KeyValueSchemaImpl(Schema<K> keySchema,
                                Schema<V> valueSchema,
                                KeyValueEncodingType keyValueEncodingType) {
+        SchemaType keySchemaType = null;
+        if (keySchema != null && keySchema.getSchemaInfo() != null) {
+            keySchemaType = keySchema.getSchemaInfo().getType();
+        }
+        SchemaType valueSchemaType = null;
+        if (valueSchema != null && valueSchema.getSchemaInfo() != null) {
+            valueSchemaType = valueSchema.getSchemaInfo().getType();
+        }
+        if ((SchemaType.EXTERNAL.equals(keySchemaType)
+                && valueSchemaType != null && 
SchemaType.isStructType(valueSchemaType))
+                || (SchemaType.EXTERNAL.equals(valueSchemaType)
+                && keySchemaType != null && 
SchemaType.isStructType(keySchemaType))) {
+            throw new IllegalArgumentException("External schema cannot be used 
with other Pulsar struct schema types,"
+                    + "keySchemaType: " + keySchemaType + ", valueSchemaType: 
" + valueSchemaType);
+        }
+
         this.keySchema = keySchema;
         this.valueSchema = valueSchema;
         this.keyValueEncodingType = keyValueEncodingType;
@@ -133,18 +153,23 @@ public class KeyValueSchemaImpl<K, V> extends 
AbstractSchema<KeyValue<K, V>> imp
 
     // encode as bytes: [key.length][key.bytes][value.length][value.bytes] or 
[value.bytes]
     public byte[] encode(KeyValue<K, V> message) {
+        return encode(null, message).data();
+    }
+
+    @Override
+    public EncodeData encode(String topic, KeyValue<K, V> message) {
         if (keyValueEncodingType != null && keyValueEncodingType == 
KeyValueEncodingType.INLINE) {
             return KeyValue.encode(
+                topic,
                 message.getKey(),
                 keySchema,
                 message.getValue(),
-                valueSchema
-            );
+                valueSchema);
         } else {
             if (message.getValue() == null) {
                 return null;
             }
-            return valueSchema.encode(message.getValue());
+            return valueSchema.encode(topic, message.getValue());
         }
     }
 
@@ -166,6 +191,20 @@ public class KeyValueSchemaImpl<K, V> extends 
AbstractSchema<KeyValue<K, V>> imp
         return decode(ByteBufUtil.getBytes(byteBuf));
     }
 
+    @Override
+    public KeyValue<K, V> decode(String topic, byte[] data, byte[] schemaId) {
+        if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
+            throw new SchemaSerializationException("This method cannot be used 
under this SEPARATED encoding type");
+        }
+        return KeyValue.decode(data, (keyBytes, valueBytes) ->
+                decode(topic, keyBytes, valueBytes, schemaId));
+    }
+
+    @Override
+    public KeyValue<K, V> decode(String topic, ByteBuffer data, byte[] 
schemaId) {
+        return decode(topic, getBytes(data), schemaId);
+    }
+
     @Override
     public KeyValue<K, V> decode(ByteBuf byteBuf, byte[] schemaVersion) {
         return decode(ByteBufUtil.getBytes(byteBuf), schemaVersion);
@@ -196,6 +235,42 @@ public class KeyValueSchemaImpl<K, V> extends 
AbstractSchema<KeyValue<K, V>> imp
         return new KeyValue<>(k, v);
     }
 
+    public KeyValue<K, V> decode(String topic, byte[] keyBytes, byte[] 
valueBytes, byte[] schemaId) {
+        K k = null;
+        byte[] keySchemaId = null;
+        byte[] valueSchemaId = null;
+        if (isValidSchemaId(schemaId)) {
+            var kvSchemaId = getKeyValueSchemaId(schemaId);
+            keySchemaId = kvSchemaId.getKey();
+            valueSchemaId = kvSchemaId.getValue();
+        }
+
+        if (keyBytes != null) {
+            if (keySchema.supportSchemaVersioning() && 
isValidSchemaId(keySchemaId)) {
+                k = keySchema.decode(topic, keyBytes, keySchemaId);
+            } else {
+                k = keySchema.decode(keyBytes);
+            }
+        }
+
+        V v = null;
+        if (valueBytes != null) {
+            if (valueSchema.supportSchemaVersioning() && 
isValidSchemaId(valueSchemaId)) {
+                v = valueSchema.decode(topic, valueBytes, valueSchemaId);
+            } else {
+                v = valueSchema.decode(valueBytes);
+            }
+        }
+        return new KeyValue<>(k, v);
+    }
+
+    private KeyValue<byte[], byte[]> getKeyValueSchemaId(byte[] schemaId) {
+        if 
(!SchemaType.EXTERNAL.equals(valueSchema.getSchemaInfo().getType())) {
+            return new KeyValue<>(schemaId, schemaId);
+        }
+        return KeyValue.getSchemaId(schemaId);
+    }
+
     public SchemaInfo getSchemaInfo() {
         return this.schemaInfo;
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index 179c0081e07..6de7f805f93 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -375,7 +375,7 @@ public class KeyValueSchemaTest {
         } catch (SchemaSerializationException e) {
             Assert.assertTrue(e.getMessage().contains("This method cannot be 
used under this SEPARATED encoding type"));
         }
-        KeyValue<Foo, Bar>  keyValue = ((KeyValueSchemaImpl) 
keyValueSchema).decode(fooSchema.encode(foo),
+        KeyValue<Foo, Bar> keyValue = ((KeyValueSchemaImpl) 
keyValueSchema).decode(null, fooSchema.encode(foo),
                 encodeBytes, null);
         Foo fooBack = keyValue.getKey();
         Bar barBack = keyValue.getValue();
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
index f64e7a91624..e92d1c09124 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
@@ -96,7 +96,7 @@ public class SupportVersioningKeyValueSchemaTest {
 
         byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = 
((KeyValueSchemaImpl) keyValueSchema).decode(
-                fooSchema.encode(foo), encodeBytes, new byte[10]);
+                null, fooSchema.encode(foo), encodeBytes, new byte[10]);
         Assert.assertTrue(keyValue.getValue().isField1());
         Assert.assertEquals(
                 
KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
@@ -157,7 +157,7 @@ public class SupportVersioningKeyValueSchemaTest {
 
         byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = 
((KeyValueSchemaImpl) keyValueSchema).decode(
-                fooSchema.encode(foo), encodeBytes, new byte[10]);
+                null, fooSchema.encode(foo), encodeBytes, new byte[10]);
         Assert.assertTrue(keyValue.getValue().isField1());
         Assert.assertEquals(
                 
KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
index 69a6cc36ae4..919efb89f0f 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
@@ -19,7 +19,11 @@
 package org.apache.pulsar.common.schema;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.client.api.EncodeData.isValidSchemaId;
+import static org.apache.pulsar.common.schema.KeyValue.generateKVSchemaId;
+import static org.apache.pulsar.common.schema.KeyValue.getSchemaId;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import io.netty.buffer.Unpooled;
 import java.nio.ByteBuffer;
 import java.sql.Time;
@@ -133,4 +137,29 @@ public class KeyValueTest {
         }
     }
 
+    @DataProvider(name = "keyValueSchemaBytes")
+    public Object[][] keyValueSchemaBytes() {
+        return new Object[][] {
+                { null, null },
+                { new byte[0], new byte[0] },
+                { null, new byte[] {4, 5, 6, 7, 8} },
+                { new byte[0], new byte[] {4, 5, 6, 7, 8} },
+                { new byte[] {1, 2, 3}, null },
+                { new byte[] {1, 2, 3}, new byte[0] },
+                { new byte[] {1, 2, 3}, new byte[] {4, 5, 6, 7, 8} },
+        };
+    }
+
+    @Test(dataProvider = "keyValueSchemaBytes")
+    public void testEncodeDecodeSchemaId(byte[] keySchemaId, byte[] 
valueSchemaId) {
+        byte[] encoded = generateKVSchemaId(keySchemaId, valueSchemaId);
+        if (!isValidSchemaId(keySchemaId) && !isValidSchemaId(valueSchemaId)) {
+            assertFalse(isValidSchemaId(encoded));
+            return;
+        }
+        var decoded = getSchemaId(encoded);
+        assertEquals(keySchemaId == null ? new byte[0] : keySchemaId, 
decoded.getKey());
+        assertEquals(valueSchemaId == null ? new byte[0] : valueSchemaId, 
decoded.getValue());
+    }
+
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
index c99c407b9d7..4016dfb2a98 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.EncodeData;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -38,9 +39,15 @@ import org.apache.pulsar.common.schema.SchemaType;
 public final class KeyValueSchemaInfo {
 
     private static final Schema<SchemaInfo> SCHEMA_INFO_WRITER = new 
Schema<SchemaInfo>() {
+
+        @Override
+        public EncodeData encode(String topic, SchemaInfo si) {
+            return new EncodeData(si.getSchema());
+        }
+
         @Override
         public byte[] encode(SchemaInfo si) {
-            return si.getSchema();
+            return encode(null, si).data();
         }
 
         @Override
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 58d58a3acef..7aaa010c11e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -840,6 +840,9 @@ public class Commands {
             return Schema.Type.AutoConsume;
         } else if (type.getValue() < 0) {
             return Schema.Type.None;
+        } else if (type == SchemaType.EXTERNAL) {
+            // This is a special case, SchemaType.EXTERNAL number is not match 
the Schema.Type.EXTERNAL.
+            return Schema.Type.External;
         } else {
             return Schema.Type.valueOf(type.getValue());
         }
@@ -851,6 +854,9 @@ public class Commands {
         } else if (type.getValue() < 0) {
             // this is unexpected
             return SchemaType.NONE;
+        } else if (type == Schema.Type.External) {
+            // This is a special case, SchemaType.EXTERNAL number is not match 
the Schema.Type.EXTERNAL.
+            return SchemaType.EXTERNAL;
         } else {
             return SchemaType.valueOf(type.getValue());
         }
@@ -1858,6 +1864,9 @@ public class Commands {
         if (builder.hasSchemaVersion()) {
             messageMetadata.setSchemaVersion(builder.getSchemaVersion());
         }
+        if (builder.hasSchemaId()) {
+            messageMetadata.setSchemaId(builder.getSchemaId());
+        }
 
         return builder.getSequenceId();
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index a03daf05f99..1bfcf16beb5 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -46,6 +46,7 @@ message Schema {
         LocalDateTime = 19;
         ProtobufNative = 20;
         AutoConsume = 21;
+        External = 22;
     }
 
     required string name = 1;
@@ -173,6 +174,7 @@ message MessageMetadata {
     // the `compacted_out` field in `SingleMessageMetadata` must be checked to 
identify and filter out compacted
     // messages (e.g., `k1 => v1` and `k1 => null` in the example above).
     repeated int32 compacted_batch_indexes = 31;
+    optional bytes schema_id = 32;
 }
 
 message SingleMessageMetadata {
diff --git 
a/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java 
b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
index c7c8aa92a2f..e1e8a218345 100644
--- a/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
+++ b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
@@ -384,6 +384,11 @@ public class HttpSinkTest {
                         return new byte[0];
                     }
 
+                    @Override
+                    public Optional<byte[]> getSchemaId() {
+                        return Optional.of(new byte[0]);
+                    }
+
                     @Override
                     public boolean isReplicated() {
                         return false;

Reply via email to