If it will break historical behavior. I agree it should start with a discussion 
on the mailing list.
And starting with a proposal is better.

For this PR https://github.com/apache/pulsar/pull/15956
It will not break the existing behavior “The tenant admin can get schemas of a 
topic”.
The tenant admin still can get schema.
It just fixed a BUG “User have produce/consumer permission for a topic, but not 
able to get schema”.

Thanks,
Penghui

Penghui
On Jun 10, 2022, 03:16 +0800, Michael Marshall <mmarsh...@apache.org>, wrote:
> I think we're running into an important edge case in our default
> authorization framework. Should the binary protocol and the REST API
> have the same levels of permission for a given role?
>
> For example, when I enable auto topic creation and give a role
> permission to produce to a namespace, that role can create topics with
> the binary protocol, but it cannot perform admin operations to create
> that same topic.
>
> I think the binary protocol and the REST API should have symmetric
> permissions to make them easily understood by users, but that breaks
> with historical behavior, so I think we should discuss it on the
> mailing list before changing each of these discrepancies.
>
> Thanks,
> Michael
>
> On Thu, Jun 9, 2022 at 9:59 AM mattison chao <mattisonc...@apache.org> wrote:
> >
> > I think it's an addition to the previously missing feature.
> >
> > It makes sense to me.
> >
> > Best,
> > Mattison
> >
> > On Thu, 9 Jun 2022 at 22:24, PengHui Li <codelipeng...@gmail.com> wrote:
> >
> > > Hi Dave,
> > >
> > > This is a BUG fix.
> > >
> > > The binary protocol allows the produce/consume permission to get the
> > > schema.
> > > But the REST API is not allowed.
> > >
> > > The schema REST API was added very earlier and we found we have granted
> > > the consume permission
> > > to the user, but the user is not able to check what the schema is for that
> > > topic.
> > > This is very confusing.
> > >
> > > A similar fix https://github.com/apache/pulsar/pull/15501 which granted
> > > all permissions for a namespace,
> > > but not able to get bundles, and then can't get the topic list.
> > > It looks like the get bundles permission change to produce/consume from
> > > tenant admin.
> > >
> > > Thanks,
> > > Penghui
> > > On Jun 9, 2022, 21:44 +0800, Dave Fisher <w...@apache.org>, wrote:
> > > > Changes to admin permissions are decisions that require some discussion
> > > on dev@pulsar.
> > > >
> > > > Regards,
> > > > Dave
> > > >
> > > > > On Jun 9, 2022, at 5:11 AM, techno...@apache.org wrote:
> > > > >
> > > > > This is an automated email from the ASF dual-hosted git repository.
> > > > >
> > > > > technoboy pushed a commit to branch master
> > > > > in repository https://gitbox.apache.org/repos/asf/pulsar.git
> > > > >
> > > > >
> > > > > The following commit(s) were added to refs/heads/master by this push:
> > > > > new 91fe3b2ce26 [modify][admin] Change the permissions of the schema
> > > API from Admin to normal produce/consume (#15956)
> > > > > 91fe3b2ce26 is described below
> > > > >
> > > > > commit 91fe3b2ce264c3a51c349b29d86e566633f91d07
> > > > > Author: Jiwei Guo <techno...@apache.org>
> > > > > AuthorDate: Thu Jun 9 20:11:21 2022 +0800
> > > > >
> > > > > [modify][admin] Change the permissions of the schema API from Admin to
> > > normal produce/consume (#15956)
> > > > > ---
> > > > > .../broker/admin/impl/SchemasResourceBase.java | 15 ++-
> > > > > .../pulsar/broker/admin/v2/SchemasResource.java | 6 +-
> > > > > .../broker/admin/AdminApiSchemaWithAuthTest.java | 139
> > > +++++++++++++++++++++
> > > > > 3 files changed, 153 insertions(+), 7 deletions(-)
> > > > >
> > > > > diff --git
> > > a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
> > > b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
> > > > > index 88487c11296..175ab5ac27c 100644
> > > > > ---
> > > a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
> > > > > +++
> > > b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
> > > > > @@ -42,6 +42,7 @@ import org.apache.pulsar.broker.web.RestException;
> > > > > import org.apache.pulsar.client.internal.DefaultImplementation;
> > > > > import org.apache.pulsar.common.naming.TopicName;
> > > > > import
> > > org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
> > > > > +import org.apache.pulsar.common.policies.data.TopicOperation;
> > > > > import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
> > > > > import
> > > org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
> > > > > import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
> > > > > @@ -97,7 +98,7 @@ public class SchemasResourceBase extends
> > > AdminResource {
> > > > > }
> > > > >
> > > > > public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean
> > > authoritative) {
> > > > > - return validateDestinationAndAdminOperationAsync(authoritative)
> > > > > + return validateOwnershipAndOperationAsync(authoritative,
> > > TopicOperation.GET_METADATA)
> > > > > .thenApply(__ -> getSchemaId())
> > > > > .thenCompose(schemaId ->
> > > pulsar().getSchemaRegistryService().getSchema(schemaId));
> > > > > }
> > > > > @@ -115,7 +116,7 @@ public class SchemasResourceBase extends
> > > AdminResource {
> > > > > }
> > > > >
> > > > > public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean
> > > authoritative, String version) {
> > > > > - return validateDestinationAndAdminOperationAsync(authoritative)
> > > > > + return validateOwnershipAndOperationAsync(authoritative,
> > > TopicOperation.GET_METADATA)
> > > > > .thenApply(__ -> getSchemaId())
> > > > > .thenCompose(schemaId -> {
> > > > > ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
> > > > > @@ -137,7 +138,7 @@ public class SchemasResourceBase extends
> > > AdminResource {
> > > > > }
> > > > >
> > > > > public CompletableFuture<List<SchemaAndMetadata>>
> > > getAllSchemasAsync(boolean authoritative) {
> > > > > - return validateDestinationAndAdminOperationAsync(authoritative)
> > > > > + return validateOwnershipAndOperationAsync(authoritative,
> > > TopicOperation.GET_METADATA)
> > > > > .thenCompose(__ -> {
> > > > > String schemaId = getSchemaId();
> > > > > return
> > > pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId);
> > > > > @@ -312,7 +313,7 @@ public class SchemasResourceBase extends
> > > AdminResource {
> > > > > }
> > > > >
> > > > > public CompletableFuture<Long>
> > > getVersionBySchemaAsync(PostSchemaPayload payload, boolean authoritative) 
> > > {
> > > > > - return validateDestinationAndAdminOperationAsync(authoritative)
> > > > > + return validateOwnershipAndOperationAsync(authoritative,
> > > TopicOperation.GET_METADATA)
> > > > > .thenCompose(__ -> {
> > > > > String schemaId = getSchemaId();
> > > > > return pulsar().getSchemaRegistryService()
> > > > > @@ -427,5 +428,11 @@ public class SchemasResourceBase extends
> > > AdminResource {
> > > > > .thenCompose(__ ->
> > > validateAdminAccessForTenantAsync(topicName.getTenant()));
> > > > > }
> > > > >
> > > > > + private CompletableFuture<Void>
> > > validateOwnershipAndOperationAsync(boolean authoritative,
> > > > > + TopicOperation operation) {
> > > > > + return validateTopicOwnershipAsync(topicName, authoritative)
> > > > > + .thenCompose(__ -> validateTopicOperationAsync(topicName,
> > > operation));
> > > > > + }
> > > > > +
> > > > > private static final Logger log =
> > > LoggerFactory.getLogger(SchemasResourceBase.class);
> > > > > }
> > > > > diff --git
> > > a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
> > > b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
> > > > > index e75eed8cd7c..335510f0893 100644
> > > > > ---
> > > a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
> > > > > +++
> > > b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
> > > > > @@ -218,7 +218,7 @@ public class SchemasResource extends
> > > SchemasResourceBase {
> > > > > @PathParam("tenant") String tenant,
> > > > > @PathParam("namespace") String namespace,
> > > > > @PathParam("topic") String topic,
> > > > > - @ApiParam(value = "A JSON value presenting a schema playload."
> > > > > + @ApiParam(value = "A JSON value presenting a schema payload."
> > > > > + " An example of the expected schema can be found down here.",
> > > > > examples = @Example(value = @ExampleProperty(mediaType =
> > > MediaType.APPLICATION_JSON,
> > > > > value = "{\"type\": \"STRING\", \"schema\": \"\", \"properties\": {
> > > \"key1\" : \"value1\" + } }")))
> > > > > @@ -264,7 +264,7 @@ public class SchemasResource extends
> > > SchemasResourceBase {
> > > > > @PathParam("tenant") String tenant,
> > > > > @PathParam("namespace") String namespace,
> > > > > @PathParam("topic") String topic,
> > > > > - @ApiParam(value = "A JSON value presenting a schema playload."
> > > > > + @ApiParam(value = "A JSON value presenting a schema payload."
> > > > > + " An example of the expected schema can be found down here.",
> > > > > examples = @Example(value = @ExampleProperty(mediaType =
> > > MediaType.APPLICATION_JSON,
> > > > > value = "{\"type\": \"STRING\", \"schema\": \"\"," + " \"properties\":
> > > { \"key1\" : \"value1\" + } }")))
> > > > > @@ -304,7 +304,7 @@ public class SchemasResource extends
> > > SchemasResourceBase {
> > > > > @PathParam("tenant") String tenant,
> > > > > @PathParam("namespace") String namespace,
> > > > > @PathParam("topic") String topic,
> > > > > - @ApiParam(value = "A JSON value presenting a schema playload."
> > > > > + @ApiParam(value = "A JSON value presenting a schema payload."
> > > > > + " An example of the expected schema can be found down here.",
> > > > > examples = @Example(value = @ExampleProperty(mediaType =
> > > MediaType.APPLICATION_JSON,
> > > > > value = "{\"type\": \"STRING\", \"schema\": \"\"," + " \"properties\":
> > > { \"key1\" : \"value1\" + } }")))
> > > > > diff --git
> > > a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
> > > b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
> > > > > new file mode 100644
> > > > > index 00000000000..29c0f97e610
> > > > > --- /dev/null
> > > > > +++
> > > b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
> > > > > @@ -0,0 +1,139 @@
> > > > > +/**
> > > > > + * Licensed to the Apache Software Foundation (ASF) under one
> > > > > + * or more contributor license agreements. See the NOTICE file
> > > > > + * distributed with this work for additional information
> > > > > + * regarding copyright ownership. The ASF licenses this file
> > > > > + * to you under the Apache License, Version 2.0 (the
> > > > > + * "License"); you may not use this file except in compliance
> > > > > + * with the License. You may obtain a copy of the License at
> > > > > + *
> > > > > + * http://www.apache.org/licenses/LICENSE-2.0
> > > > > + *
> > > > > + * Unless required by applicable law or agreed to in writing,
> > > > > + * software distributed under the License is distributed on an
> > > > > + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> > > > > + * KIND, either express or implied. See the License for the
> > > > > + * specific language governing permissions and limitations
> > > > > + * under the License.
> > > > > + */
> > > > > +package org.apache.pulsar.broker.admin;
> > > > > +
> > > > > +import com.google.common.collect.Sets;
> > > > > +import io.jsonwebtoken.Jwts;
> > > > > +import io.jsonwebtoken.SignatureAlgorithm;
> > > > > +import lombok.extern.slf4j.Slf4j;
> > > > > +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
> > > > > +import
> > > org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
> > > > > +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
> > > > > +import org.apache.pulsar.client.admin.PulsarAdmin;
> > > > > +import org.apache.pulsar.client.admin.PulsarAdminBuilder;
> > > > > +import org.apache.pulsar.client.admin.PulsarAdminException;
> > > > > +import org.apache.pulsar.client.api.Schema;
> > > > > +import org.apache.pulsar.client.impl.auth.AuthenticationToken;
> > > > > +import org.apache.pulsar.common.policies.data.AuthAction;
> > > > > +import org.apache.pulsar.common.policies.data.ClusterData;
> > > > > +import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> > > > > +import org.apache.pulsar.common.schema.SchemaInfo;
> > > > > +import org.mockito.Mockito;
> > > > > +import org.testng.annotations.AfterMethod;
> > > > > +import org.testng.annotations.BeforeMethod;
> > > > > +import org.testng.annotations.Test;
> > > > > +import javax.crypto.SecretKey;
> > > > > +import java.util.Base64;
> > > > > +import java.util.EnumSet;
> > > > > +import java.util.HashSet;
> > > > > +import java.util.List;
> > > > > +import java.util.Set;
> > > > > +import static org.testng.Assert.assertEquals;
> > > > > +import static org.testng.Assert.assertThrows;
> > > > > +import static org.testng.Assert.assertTrue;
> > > > > +/**
> > > > > + * Unit tests for schema admin api.
> > > > > + */
> > > > > +@Slf4j
> > > > > +@Test(groups = "broker-admin")
> > > > > +public class AdminApiSchemaWithAuthTest extends
> > > MockedPulsarServiceBaseTest {
> > > > > +
> > > > > + private static final SecretKey SECRET_KEY =
> > > AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
> > > > > + private static final String ADMIN_TOKEN =
> > > Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
> > > > > + private static final String CONSUME_TOKEN =
> > > Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact();
> > > > > +
> > > > > + @BeforeMethod
> > > > > + @Override
> > > > > + public void setup() throws Exception {
> > > > > + conf.setAuthorizationEnabled(true);
> > > > > + conf.setAuthenticationEnabled(true);
> > > > > + conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
> > > > > + + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
> > > > > + Set<String> providers = new HashSet<>();
> > > > > + providers.add(AuthenticationProviderToken.class.getName());
> > > > > + Set<String> superUserRoles = new HashSet<>();
> > > > > + superUserRoles.add("admin");
> > > > > + conf.setSuperUserRoles(superUserRoles);
> > > > > + conf.setAuthenticationProviders(providers);
> > > > > + conf.setSystemTopicEnabled(false);
> > > > > + conf.setTopicLevelPoliciesEnabled(false);
> > > > > + super.internalSetup();
> > > > > +
> > > > > + PulsarAdminBuilder pulsarAdminBuilder =
> > > PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
> > > > > + ? brokerUrl.toString() : brokerUrlTls.toString())
> > > > > + .authentication(AuthenticationToken.class.getName(),
> > > > > + ADMIN_TOKEN);
> > > > > + admin = Mockito.spy(pulsarAdminBuilder.build());
> > > > > +
> > > > > + // Setup namespaces
> > > > > + admin.clusters().createCluster("test",
> > > ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
> > > > > + TenantInfoImpl tenantInfo = new
> > > TenantInfoImpl(Sets.newHashSet("role1", "role2"), 
> > > Sets.newHashSet("test"));
> > > > > + admin.tenants().createTenant("schematest", tenantInfo);
> > > > > + admin.namespaces().createNamespace("schematest/test",
> > > Sets.newHashSet("test"));
> > > > > + }
> > > > > +
> > > > > + @AfterMethod(alwaysRun = true)
> > > > > + @Override
> > > > > + public void cleanup() throws Exception {
> > > > > + super.internalCleanup();
> > > > > + }
> > > > > +
> > > > > + @Test
> > > > > + public void testGetCreateDeleteSchema() throws Exception {
> > > > > + String topicName = "persistent://schematest/test/testCreateSchema";
> > > > > + PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
> > > > > + .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() :
> > > brokerUrlTls.toString())
> > > > > + .build();
> > > > > + PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
> > > > > + .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() :
> > > brokerUrlTls.toString())
> > > > > + .authentication(AuthenticationToken.class.getName(), ADMIN_TOKEN)
> > > > > + .build();
> > > > > + PulsarAdmin adminWithConsumePermission = PulsarAdmin.builder()
> > > > > + .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() :
> > > brokerUrlTls.toString())
> > > > > + .authentication(AuthenticationToken.class.getName(), CONSUME_TOKEN)
> > > > > + .build();
> > > > > + admin.topics().grantPermission(topicName, "consumer",
> > > EnumSet.of(AuthAction.consume));
> > > > > + admin.topics().grantPermission(topicName, "producer",
> > > EnumSet.of(AuthAction.produce));
> > > > > +
> > > > > + SchemaInfo si = Schema.BOOL.getSchemaInfo();
> > > > > + assertThrows(PulsarAdminException.class, () ->
> > > adminWithoutPermission.schemas().createSchema(topicName, si));
> > > > > + adminWithAdminPermission.schemas().createSchema(topicName, si);
> > > > > +
> > > > > + assertThrows(PulsarAdminException.class, () ->
> > > adminWithoutPermission.schemas().getSchemaInfo(topicName));
> > > > > + SchemaInfo readSi =
> > > adminWithConsumePermission.schemas().getSchemaInfo(topicName);
> > > > > + assertEquals(readSi, si);
> > > > > +
> > > > > + assertThrows(PulsarAdminException.class, () ->
> > > adminWithoutPermission.schemas().getSchemaInfo(topicName, 0));
> > > > > + readSi =
> > > adminWithConsumePermission.schemas().getSchemaInfo(topicName, 0);
> > > > > + assertEquals(readSi, si);
> > > > > + List<SchemaInfo> allSchemas =
> > > adminWithConsumePermission.schemas().getAllSchemas(topicName);
> > > > > + assertEquals(allSchemas.size(), 1);
> > > > > +
> > > > > + SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
> > > > > + assertThrows(PulsarAdminException.class, () ->
> > > adminWithoutPermission.schemas().testCompatibility(topicName, 
> > > schemaInfo2));
> > > > > +
> > > assertTrue(adminWithAdminPermission.schemas().testCompatibility(topicName,
> > > schemaInfo2).isCompatibility());
> > > > > +
> > > > > + assertThrows(PulsarAdminException.class, () ->
> > > adminWithoutPermission.schemas().getVersionBySchema(topicName, si));
> > > > > + Long versionBySchema =
> > > adminWithConsumePermission.schemas().getVersionBySchema(topicName, si);
> > > > > + assertEquals(versionBySchema, Long.valueOf(0L));
> > > > > +
> > > > > + assertThrows(PulsarAdminException.class, () ->
> > > adminWithoutPermission.schemas().deleteSchema(topicName));
> > > > > + adminWithAdminPermission.schemas().deleteSchema(topicName);
> > > > > + }
> > > > > +}
> > > > >
> > > >
> > >

Reply via email to