I think it's an addition to the previously missing feature. It makes sense to me.
Best, Mattison On Thu, 9 Jun 2022 at 22:24, PengHui Li <codelipeng...@gmail.com> wrote: > Hi Dave, > > This is a BUG fix. > > The binary protocol allows the produce/consume permission to get the > schema. > But the REST API is not allowed. > > The schema REST API was added very earlier and we found we have granted > the consume permission > to the user, but the user is not able to check what the schema is for that > topic. > This is very confusing. > > A similar fix https://github.com/apache/pulsar/pull/15501 which granted > all permissions for a namespace, > but not able to get bundles, and then can't get the topic list. > It looks like the get bundles permission change to produce/consume from > tenant admin. > > Thanks, > Penghui > On Jun 9, 2022, 21:44 +0800, Dave Fisher <w...@apache.org>, wrote: > > Changes to admin permissions are decisions that require some discussion > on dev@pulsar. > > > > Regards, > > Dave > > > > > On Jun 9, 2022, at 5:11 AM, techno...@apache.org wrote: > > > > > > This is an automated email from the ASF dual-hosted git repository. > > > > > > technoboy pushed a commit to branch master > > > in repository https://gitbox.apache.org/repos/asf/pulsar.git > > > > > > > > > The following commit(s) were added to refs/heads/master by this push: > > > new 91fe3b2ce26 [modify][admin] Change the permissions of the schema > API from Admin to normal produce/consume (#15956) > > > 91fe3b2ce26 is described below > > > > > > commit 91fe3b2ce264c3a51c349b29d86e566633f91d07 > > > Author: Jiwei Guo <techno...@apache.org> > > > AuthorDate: Thu Jun 9 20:11:21 2022 +0800 > > > > > > [modify][admin] Change the permissions of the schema API from Admin to > normal produce/consume (#15956) > > > --- > > > .../broker/admin/impl/SchemasResourceBase.java | 15 ++- > > > .../pulsar/broker/admin/v2/SchemasResource.java | 6 +- > > > .../broker/admin/AdminApiSchemaWithAuthTest.java | 139 > +++++++++++++++++++++ > > > 3 files changed, 153 insertions(+), 7 deletions(-) > > > > > > diff --git > a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java > b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java > > > index 88487c11296..175ab5ac27c 100644 > > > --- > a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java > > > +++ > b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java > > > @@ -42,6 +42,7 @@ import org.apache.pulsar.broker.web.RestException; > > > import org.apache.pulsar.client.internal.DefaultImplementation; > > > import org.apache.pulsar.common.naming.TopicName; > > > import > org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; > > > +import org.apache.pulsar.common.policies.data.TopicOperation; > > > import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse; > > > import > org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse; > > > import org.apache.pulsar.common.protocol.schema.GetSchemaResponse; > > > @@ -97,7 +98,7 @@ public class SchemasResourceBase extends > AdminResource { > > > } > > > > > > public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean > authoritative) { > > > - return validateDestinationAndAdminOperationAsync(authoritative) > > > + return validateOwnershipAndOperationAsync(authoritative, > TopicOperation.GET_METADATA) > > > .thenApply(__ -> getSchemaId()) > > > .thenCompose(schemaId -> > pulsar().getSchemaRegistryService().getSchema(schemaId)); > > > } > > > @@ -115,7 +116,7 @@ public class SchemasResourceBase extends > AdminResource { > > > } > > > > > > public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean > authoritative, String version) { > > > - return validateDestinationAndAdminOperationAsync(authoritative) > > > + return validateOwnershipAndOperationAsync(authoritative, > TopicOperation.GET_METADATA) > > > .thenApply(__ -> getSchemaId()) > > > .thenCompose(schemaId -> { > > > ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES); > > > @@ -137,7 +138,7 @@ public class SchemasResourceBase extends > AdminResource { > > > } > > > > > > public CompletableFuture<List<SchemaAndMetadata>> > getAllSchemasAsync(boolean authoritative) { > > > - return validateDestinationAndAdminOperationAsync(authoritative) > > > + return validateOwnershipAndOperationAsync(authoritative, > TopicOperation.GET_METADATA) > > > .thenCompose(__ -> { > > > String schemaId = getSchemaId(); > > > return > pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId); > > > @@ -312,7 +313,7 @@ public class SchemasResourceBase extends > AdminResource { > > > } > > > > > > public CompletableFuture<Long> > getVersionBySchemaAsync(PostSchemaPayload payload, boolean authoritative) { > > > - return validateDestinationAndAdminOperationAsync(authoritative) > > > + return validateOwnershipAndOperationAsync(authoritative, > TopicOperation.GET_METADATA) > > > .thenCompose(__ -> { > > > String schemaId = getSchemaId(); > > > return pulsar().getSchemaRegistryService() > > > @@ -427,5 +428,11 @@ public class SchemasResourceBase extends > AdminResource { > > > .thenCompose(__ -> > validateAdminAccessForTenantAsync(topicName.getTenant())); > > > } > > > > > > + private CompletableFuture<Void> > validateOwnershipAndOperationAsync(boolean authoritative, > > > + TopicOperation operation) { > > > + return validateTopicOwnershipAsync(topicName, authoritative) > > > + .thenCompose(__ -> validateTopicOperationAsync(topicName, > operation)); > > > + } > > > + > > > private static final Logger log = > LoggerFactory.getLogger(SchemasResourceBase.class); > > > } > > > diff --git > a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java > b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java > > > index e75eed8cd7c..335510f0893 100644 > > > --- > a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java > > > +++ > b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java > > > @@ -218,7 +218,7 @@ public class SchemasResource extends > SchemasResourceBase { > > > @PathParam("tenant") String tenant, > > > @PathParam("namespace") String namespace, > > > @PathParam("topic") String topic, > > > - @ApiParam(value = "A JSON value presenting a schema playload." > > > + @ApiParam(value = "A JSON value presenting a schema payload." > > > + " An example of the expected schema can be found down here.", > > > examples = @Example(value = @ExampleProperty(mediaType = > MediaType.APPLICATION_JSON, > > > value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": { > \"key1\" : \"value1\" + } }"))) > > > @@ -264,7 +264,7 @@ public class SchemasResource extends > SchemasResourceBase { > > > @PathParam("tenant") String tenant, > > > @PathParam("namespace") String namespace, > > > @PathParam("topic") String topic, > > > - @ApiParam(value = "A JSON value presenting a schema playload." > > > + @ApiParam(value = "A JSON value presenting a schema payload." > > > + " An example of the expected schema can be found down here.", > > > examples = @Example(value = @ExampleProperty(mediaType = > MediaType.APPLICATION_JSON, > > > value = "{\"type\": \"STRING\", \"schema\": \"\"," + " \"properties\": > { \"key1\" : \"value1\" + } }"))) > > > @@ -304,7 +304,7 @@ public class SchemasResource extends > SchemasResourceBase { > > > @PathParam("tenant") String tenant, > > > @PathParam("namespace") String namespace, > > > @PathParam("topic") String topic, > > > - @ApiParam(value = "A JSON value presenting a schema playload." > > > + @ApiParam(value = "A JSON value presenting a schema payload." > > > + " An example of the expected schema can be found down here.", > > > examples = @Example(value = @ExampleProperty(mediaType = > MediaType.APPLICATION_JSON, > > > value = "{\"type\": \"STRING\", \"schema\": \"\"," + " \"properties\": > { \"key1\" : \"value1\" + } }"))) > > > diff --git > a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java > b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java > > > new file mode 100644 > > > index 00000000000..29c0f97e610 > > > --- /dev/null > > > +++ > b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java > > > @@ -0,0 +1,139 @@ > > > +/** > > > + * Licensed to the Apache Software Foundation (ASF) under one > > > + * or more contributor license agreements. See the NOTICE file > > > + * distributed with this work for additional information > > > + * regarding copyright ownership. The ASF licenses this file > > > + * to you under the Apache License, Version 2.0 (the > > > + * "License"); you may not use this file except in compliance > > > + * with the License. You may obtain a copy of the License at > > > + * > > > + * http://www.apache.org/licenses/LICENSE-2.0 > > > + * > > > + * Unless required by applicable law or agreed to in writing, > > > + * software distributed under the License is distributed on an > > > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY > > > + * KIND, either express or implied. See the License for the > > > + * specific language governing permissions and limitations > > > + * under the License. > > > + */ > > > +package org.apache.pulsar.broker.admin; > > > + > > > +import com.google.common.collect.Sets; > > > +import io.jsonwebtoken.Jwts; > > > +import io.jsonwebtoken.SignatureAlgorithm; > > > +import lombok.extern.slf4j.Slf4j; > > > +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; > > > +import > org.apache.pulsar.broker.authentication.AuthenticationProviderToken; > > > +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; > > > +import org.apache.pulsar.client.admin.PulsarAdmin; > > > +import org.apache.pulsar.client.admin.PulsarAdminBuilder; > > > +import org.apache.pulsar.client.admin.PulsarAdminException; > > > +import org.apache.pulsar.client.api.Schema; > > > +import org.apache.pulsar.client.impl.auth.AuthenticationToken; > > > +import org.apache.pulsar.common.policies.data.AuthAction; > > > +import org.apache.pulsar.common.policies.data.ClusterData; > > > +import org.apache.pulsar.common.policies.data.TenantInfoImpl; > > > +import org.apache.pulsar.common.schema.SchemaInfo; > > > +import org.mockito.Mockito; > > > +import org.testng.annotations.AfterMethod; > > > +import org.testng.annotations.BeforeMethod; > > > +import org.testng.annotations.Test; > > > +import javax.crypto.SecretKey; > > > +import java.util.Base64; > > > +import java.util.EnumSet; > > > +import java.util.HashSet; > > > +import java.util.List; > > > +import java.util.Set; > > > +import static org.testng.Assert.assertEquals; > > > +import static org.testng.Assert.assertThrows; > > > +import static org.testng.Assert.assertTrue; > > > +/** > > > + * Unit tests for schema admin api. > > > + */ > > > +@Slf4j > > > +@Test(groups = "broker-admin") > > > +public class AdminApiSchemaWithAuthTest extends > MockedPulsarServiceBaseTest { > > > + > > > + private static final SecretKey SECRET_KEY = > AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); > > > + private static final String ADMIN_TOKEN = > Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact(); > > > + private static final String CONSUME_TOKEN = > Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact(); > > > + > > > + @BeforeMethod > > > + @Override > > > + public void setup() throws Exception { > > > + conf.setAuthorizationEnabled(true); > > > + conf.setAuthenticationEnabled(true); > > > + conf.getProperties().setProperty("tokenSecretKey", "data:;base64," > > > + + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded())); > > > + Set<String> providers = new HashSet<>(); > > > + providers.add(AuthenticationProviderToken.class.getName()); > > > + Set<String> superUserRoles = new HashSet<>(); > > > + superUserRoles.add("admin"); > > > + conf.setSuperUserRoles(superUserRoles); > > > + conf.setAuthenticationProviders(providers); > > > + conf.setSystemTopicEnabled(false); > > > + conf.setTopicLevelPoliciesEnabled(false); > > > + super.internalSetup(); > > > + > > > + PulsarAdminBuilder pulsarAdminBuilder = > PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null > > > + ? brokerUrl.toString() : brokerUrlTls.toString()) > > > + .authentication(AuthenticationToken.class.getName(), > > > + ADMIN_TOKEN); > > > + admin = Mockito.spy(pulsarAdminBuilder.build()); > > > + > > > + // Setup namespaces > > > + admin.clusters().createCluster("test", > ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); > > > + TenantInfoImpl tenantInfo = new > TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); > > > + admin.tenants().createTenant("schematest", tenantInfo); > > > + admin.namespaces().createNamespace("schematest/test", > Sets.newHashSet("test")); > > > + } > > > + > > > + @AfterMethod(alwaysRun = true) > > > + @Override > > > + public void cleanup() throws Exception { > > > + super.internalCleanup(); > > > + } > > > + > > > + @Test > > > + public void testGetCreateDeleteSchema() throws Exception { > > > + String topicName = "persistent://schematest/test/testCreateSchema"; > > > + PulsarAdmin adminWithoutPermission = PulsarAdmin.builder() > > > + .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : > brokerUrlTls.toString()) > > > + .build(); > > > + PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder() > > > + .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : > brokerUrlTls.toString()) > > > + .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN) > > > + .build(); > > > + PulsarAdmin adminWithConsumePermission = PulsarAdmin.builder() > > > + .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : > brokerUrlTls.toString()) > > > + .authentication(AuthenticationToken.class.getName(), CONSUME_TOKEN) > > > + .build(); > > > + admin.topics().grantPermission(topicName, "consumer", > EnumSet.of(AuthAction.consume)); > > > + admin.topics().grantPermission(topicName, "producer", > EnumSet.of(AuthAction.produce)); > > > + > > > + SchemaInfo si = Schema.BOOL.getSchemaInfo(); > > > + assertThrows(PulsarAdminException.class, () -> > adminWithoutPermission.schemas().createSchema(topicName, si)); > > > + adminWithAdminPermission.schemas().createSchema(topicName, si); > > > + > > > + assertThrows(PulsarAdminException.class, () -> > adminWithoutPermission.schemas().getSchemaInfo(topicName)); > > > + SchemaInfo readSi = > adminWithConsumePermission.schemas().getSchemaInfo(topicName); > > > + assertEquals(readSi, si); > > > + > > > + assertThrows(PulsarAdminException.class, () -> > adminWithoutPermission.schemas().getSchemaInfo(topicName, 0)); > > > + readSi = > adminWithConsumePermission.schemas().getSchemaInfo(topicName, 0); > > > + assertEquals(readSi, si); > > > + List<SchemaInfo> allSchemas = > adminWithConsumePermission.schemas().getAllSchemas(topicName); > > > + assertEquals(allSchemas.size(), 1); > > > + > > > + SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo(); > > > + assertThrows(PulsarAdminException.class, () -> > adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2)); > > > + > assertTrue(adminWithAdminPermission.schemas().testCompatibility(topicName, > schemaInfo2).isCompatibility()); > > > + > > > + assertThrows(PulsarAdminException.class, () -> > adminWithoutPermission.schemas().getVersionBySchema(topicName, si)); > > > + Long versionBySchema = > adminWithConsumePermission.schemas().getVersionBySchema(topicName, si); > > > + assertEquals(versionBySchema, Long.valueOf(0L)); > > > + > > > + assertThrows(PulsarAdminException.class, () -> > adminWithoutPermission.schemas().deleteSchema(topicName)); > > > + adminWithAdminPermission.schemas().deleteSchema(topicName); > > > + } > > > +} > > > > > >