Changes to admin permissions are decisions that require some discussion on 
dev@pulsar.

Regards,
Dave

> On Jun 9, 2022, at 5:11 AM, techno...@apache.org wrote:
> 
> This is an automated email from the ASF dual-hosted git repository.
> 
> technoboy pushed a commit to branch master
> in repository https://gitbox.apache.org/repos/asf/pulsar.git
> 
> 
> The following commit(s) were added to refs/heads/master by this push:
>     new 91fe3b2ce26 [modify][admin] Change the permissions of the schema API 
> from Admin to normal produce/consume (#15956)
> 91fe3b2ce26 is described below
> 
> commit 91fe3b2ce264c3a51c349b29d86e566633f91d07
> Author: Jiwei Guo <techno...@apache.org>
> AuthorDate: Thu Jun 9 20:11:21 2022 +0800
> 
>    [modify][admin] Change the permissions of the schema API from Admin to 
> normal produce/consume (#15956)
> ---
> .../broker/admin/impl/SchemasResourceBase.java     |  15 ++-
> .../pulsar/broker/admin/v2/SchemasResource.java    |   6 +-
> .../broker/admin/AdminApiSchemaWithAuthTest.java   | 139 +++++++++++++++++++++
> 3 files changed, 153 insertions(+), 7 deletions(-)
> 
> diff --git 
> a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
>  
> b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
> index 88487c11296..175ab5ac27c 100644
> --- 
> a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
> +++ 
> b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SchemasResourceBase.java
> @@ -42,6 +42,7 @@ import org.apache.pulsar.broker.web.RestException;
> import org.apache.pulsar.client.internal.DefaultImplementation;
> import org.apache.pulsar.common.naming.TopicName;
> import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
> +import org.apache.pulsar.common.policies.data.TopicOperation;
> import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
> import org.apache.pulsar.common.protocol.schema.GetAllVersionsSchemaResponse;
> import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
> @@ -97,7 +98,7 @@ public class SchemasResourceBase extends AdminResource {
>     }
> 
>     public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean 
> authoritative) {
> -        return validateDestinationAndAdminOperationAsync(authoritative)
> +        return validateOwnershipAndOperationAsync(authoritative, 
> TopicOperation.GET_METADATA)
>                 .thenApply(__ -> getSchemaId())
>                 .thenCompose(schemaId -> 
> pulsar().getSchemaRegistryService().getSchema(schemaId));
>     }
> @@ -115,7 +116,7 @@ public class SchemasResourceBase extends AdminResource {
>     }
> 
>     public CompletableFuture<SchemaAndMetadata> getSchemaAsync(boolean 
> authoritative, String version) {
> -        return validateDestinationAndAdminOperationAsync(authoritative)
> +        return validateOwnershipAndOperationAsync(authoritative, 
> TopicOperation.GET_METADATA)
>                 .thenApply(__ -> getSchemaId())
>                 .thenCompose(schemaId -> {
>                     ByteBuffer bbVersion = ByteBuffer.allocate(Long.BYTES);
> @@ -137,7 +138,7 @@ public class SchemasResourceBase extends AdminResource {
>     }
> 
>     public CompletableFuture<List<SchemaAndMetadata>> 
> getAllSchemasAsync(boolean authoritative) {
> -        return validateDestinationAndAdminOperationAsync(authoritative)
> +        return validateOwnershipAndOperationAsync(authoritative, 
> TopicOperation.GET_METADATA)
>                 .thenCompose(__ -> {
>                     String schemaId = getSchemaId();
>                     return 
> pulsar().getSchemaRegistryService().trimDeletedSchemaAndGetList(schemaId);
> @@ -312,7 +313,7 @@ public class SchemasResourceBase extends AdminResource {
>     }
> 
>     public CompletableFuture<Long> getVersionBySchemaAsync(PostSchemaPayload 
> payload, boolean authoritative) {
> -        return validateDestinationAndAdminOperationAsync(authoritative)
> +        return validateOwnershipAndOperationAsync(authoritative, 
> TopicOperation.GET_METADATA)
>                 .thenCompose(__ -> {
>                     String schemaId = getSchemaId();
>                     return pulsar().getSchemaRegistryService()
> @@ -427,5 +428,11 @@ public class SchemasResourceBase extends AdminResource {
>                 .thenCompose(__ -> 
> validateAdminAccessForTenantAsync(topicName.getTenant()));
>     }
> 
> +    private CompletableFuture<Void> 
> validateOwnershipAndOperationAsync(boolean authoritative,
> +                                                                       
> TopicOperation operation) {
> +        return validateTopicOwnershipAsync(topicName, authoritative)
> +                .thenCompose(__ -> validateTopicOperationAsync(topicName, 
> operation));
> +    }
> +
>     private static final Logger log = 
> LoggerFactory.getLogger(SchemasResourceBase.class);
> }
> diff --git 
> a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
>  
> b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
> index e75eed8cd7c..335510f0893 100644
> --- 
> a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
> +++ 
> b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
> @@ -218,7 +218,7 @@ public class SchemasResource extends SchemasResourceBase {
>             @PathParam("tenant") String tenant,
>             @PathParam("namespace") String namespace,
>             @PathParam("topic") String topic,
> -            @ApiParam(value = "A JSON value presenting a schema playload."
> +            @ApiParam(value = "A JSON value presenting a schema payload."
>                     + " An example of the expected schema can be found down 
> here.",
>                examples = @Example(value = @ExampleProperty(mediaType = 
> MediaType.APPLICATION_JSON,
>                value = "{\"type\": \"STRING\", \"schema\": \"\", 
> \"properties\": { \"key1\" : \"value1\" + } }")))
> @@ -264,7 +264,7 @@ public class SchemasResource extends SchemasResourceBase {
>             @PathParam("tenant") String tenant,
>             @PathParam("namespace") String namespace,
>             @PathParam("topic") String topic,
> -            @ApiParam(value = "A JSON value presenting a schema playload."
> +            @ApiParam(value = "A JSON value presenting a schema payload."
>                             + " An example of the expected schema can be 
> found down here.",
>              examples = @Example(value = @ExampleProperty(mediaType = 
> MediaType.APPLICATION_JSON,
>              value = "{\"type\": \"STRING\", \"schema\": \"\"," + " 
> \"properties\": { \"key1\" : \"value1\" + } }")))
> @@ -304,7 +304,7 @@ public class SchemasResource extends SchemasResourceBase {
>             @PathParam("tenant") String tenant,
>             @PathParam("namespace") String namespace,
>             @PathParam("topic") String topic,
> -            @ApiParam(value = "A JSON value presenting a schema playload."
> +            @ApiParam(value = "A JSON value presenting a schema payload."
>                             + " An example of the expected schema can be 
> found down here.",
>             examples = @Example(value = @ExampleProperty(mediaType = 
> MediaType.APPLICATION_JSON,
>             value = "{\"type\": \"STRING\", \"schema\": \"\"," + " 
> \"properties\": { \"key1\" : \"value1\" + } }")))
> diff --git 
> a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
>  
> b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
> new file mode 100644
> index 00000000000..29c0f97e610
> --- /dev/null
> +++ 
> b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaWithAuthTest.java
> @@ -0,0 +1,139 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *   http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +package org.apache.pulsar.broker.admin;
> +
> +import com.google.common.collect.Sets;
> +import io.jsonwebtoken.Jwts;
> +import io.jsonwebtoken.SignatureAlgorithm;
> +import lombok.extern.slf4j.Slf4j;
> +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
> +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
> +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
> +import org.apache.pulsar.client.admin.PulsarAdmin;
> +import org.apache.pulsar.client.admin.PulsarAdminBuilder;
> +import org.apache.pulsar.client.admin.PulsarAdminException;
> +import org.apache.pulsar.client.api.Schema;
> +import org.apache.pulsar.client.impl.auth.AuthenticationToken;
> +import org.apache.pulsar.common.policies.data.AuthAction;
> +import org.apache.pulsar.common.policies.data.ClusterData;
> +import org.apache.pulsar.common.policies.data.TenantInfoImpl;
> +import org.apache.pulsar.common.schema.SchemaInfo;
> +import org.mockito.Mockito;
> +import org.testng.annotations.AfterMethod;
> +import org.testng.annotations.BeforeMethod;
> +import org.testng.annotations.Test;
> +import javax.crypto.SecretKey;
> +import java.util.Base64;
> +import java.util.EnumSet;
> +import java.util.HashSet;
> +import java.util.List;
> +import java.util.Set;
> +import static org.testng.Assert.assertEquals;
> +import static org.testng.Assert.assertThrows;
> +import static org.testng.Assert.assertTrue;
> +/**
> + * Unit tests for schema admin api.
> + */
> +@Slf4j
> +@Test(groups = "broker-admin")
> +public class AdminApiSchemaWithAuthTest extends MockedPulsarServiceBaseTest {
> +
> +    private static final SecretKey SECRET_KEY = 
> AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
> +    private static final String ADMIN_TOKEN = 
> Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact();
> +    private static final String CONSUME_TOKEN = 
> Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact();
> +
> +    @BeforeMethod
> +    @Override
> +    public void setup() throws Exception {
> +        conf.setAuthorizationEnabled(true);
> +        conf.setAuthenticationEnabled(true);
> +        conf.getProperties().setProperty("tokenSecretKey", "data:;base64,"
> +                + 
> Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded()));
> +        Set<String> providers = new HashSet<>();
> +        providers.add(AuthenticationProviderToken.class.getName());
> +        Set<String> superUserRoles = new HashSet<>();
> +        superUserRoles.add("admin");
> +        conf.setSuperUserRoles(superUserRoles);
> +        conf.setAuthenticationProviders(providers);
> +        conf.setSystemTopicEnabled(false);
> +        conf.setTopicLevelPoliciesEnabled(false);
> +        super.internalSetup();
> +
> +        PulsarAdminBuilder pulsarAdminBuilder = 
> PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
> +                        ? brokerUrl.toString() : brokerUrlTls.toString())
> +                .authentication(AuthenticationToken.class.getName(),
> +                        ADMIN_TOKEN);
> +        admin = Mockito.spy(pulsarAdminBuilder.build());
> +
> +        // Setup namespaces
> +        admin.clusters().createCluster("test", 
> ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
> +        TenantInfoImpl tenantInfo = new 
> TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
> +        admin.tenants().createTenant("schematest", tenantInfo);
> +        admin.namespaces().createNamespace("schematest/test", 
> Sets.newHashSet("test"));
> +    }
> +
> +    @AfterMethod(alwaysRun = true)
> +    @Override
> +    public void cleanup() throws Exception {
> +        super.internalCleanup();
> +    }
> +
> +    @Test
> +    public void testGetCreateDeleteSchema() throws Exception {
> +        String topicName = "persistent://schematest/test/testCreateSchema";
> +        PulsarAdmin adminWithoutPermission = PulsarAdmin.builder()
> +                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
> brokerUrlTls.toString())
> +                .build();
> +        PulsarAdmin adminWithAdminPermission = PulsarAdmin.builder()
> +                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
> brokerUrlTls.toString())
> +                .authentication(AuthenticationToken.class.getName(), 
> ADMIN_TOKEN)
> +                .build();
> +        PulsarAdmin adminWithConsumePermission = PulsarAdmin.builder()
> +                .serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : 
> brokerUrlTls.toString())
> +                .authentication(AuthenticationToken.class.getName(), 
> CONSUME_TOKEN)
> +                .build();
> +        admin.topics().grantPermission(topicName, "consumer", 
> EnumSet.of(AuthAction.consume));
> +        admin.topics().grantPermission(topicName, "producer", 
> EnumSet.of(AuthAction.produce));
> +
> +        SchemaInfo si = Schema.BOOL.getSchemaInfo();
> +        assertThrows(PulsarAdminException.class, () -> 
> adminWithoutPermission.schemas().createSchema(topicName, si));
> +        adminWithAdminPermission.schemas().createSchema(topicName, si);
> +
> +        assertThrows(PulsarAdminException.class, () -> 
> adminWithoutPermission.schemas().getSchemaInfo(topicName));
> +        SchemaInfo readSi = 
> adminWithConsumePermission.schemas().getSchemaInfo(topicName);
> +        assertEquals(readSi, si);
> +
> +        assertThrows(PulsarAdminException.class, () -> 
> adminWithoutPermission.schemas().getSchemaInfo(topicName, 0));
> +        readSi = 
> adminWithConsumePermission.schemas().getSchemaInfo(topicName, 0);
> +        assertEquals(readSi, si);
> +        List<SchemaInfo> allSchemas = 
> adminWithConsumePermission.schemas().getAllSchemas(topicName);
> +        assertEquals(allSchemas.size(), 1);
> +
> +        SchemaInfo schemaInfo2 = Schema.BOOL.getSchemaInfo();
> +        assertThrows(PulsarAdminException.class, () -> 
> adminWithoutPermission.schemas().testCompatibility(topicName, schemaInfo2));
> +        
> assertTrue(adminWithAdminPermission.schemas().testCompatibility(topicName, 
> schemaInfo2).isCompatibility());
> +
> +        assertThrows(PulsarAdminException.class, () -> 
> adminWithoutPermission.schemas().getVersionBySchema(topicName, si));
> +        Long versionBySchema = 
> adminWithConsumePermission.schemas().getVersionBySchema(topicName, si);
> +        assertEquals(versionBySchema, Long.valueOf(0L));
> +
> +        assertThrows(PulsarAdminException.class, () -> 
> adminWithoutPermission.schemas().deleteSchema(topicName));
> +        adminWithAdminPermission.schemas().deleteSchema(topicName);
> +    }
> +}
> 

Reply via email to