[GitHub] [pulsar-dotpulsar] VisualBean commented on issue #101: `AuthenticateUsingToken(Func> tokenFactory)` is not a part of 2.3.0

2022-04-03 Thread GitBox


VisualBean commented on issue #101:
URL: 
https://github.com/apache/pulsar-dotpulsar/issues/101#issuecomment-1086892520


   I never found it, but ended up overriding iauthentication instead - which 
worked great for OAuth using MSAL. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15015: [enh][transaction] Optimize to reuse transaction buffer snapshot writer

2022-04-03 Thread GitBox


Technoboy- commented on code in PR #15015:
URL: https://github.com/apache/pulsar/pull/15015#discussion_r841313610


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java:
##
@@ -18,69 +18,168 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.util.AbstractReferenceCounted;
+import io.netty.util.ReferenceCounted;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
 import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
 import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
 import 
org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
 import org.apache.pulsar.client.api.PulsarClient;
-import 
org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
-import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
+@Slf4j
 public class SystemTopicBaseTxnBufferSnapshotService implements 
TransactionBufferSnapshotService {
 
-private final Map> 
clients;
+private final Map> clients;
 
 private final NamespaceEventsSystemTopicFactory 
namespaceEventsSystemTopicFactory;
 
-public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
+private final ScheduledExecutorService scheduledExecutorService;
+private final ConcurrentHashMap 
writerFutureMap;
+private final 
LinkedList>> 
pendingCloseWriterList;
+
+// The class ReferenceCountedWriter will maintain the reference count,
+// when the reference count decrement to 0, it will be removed from 
writerFutureMap, the writer will be closed.
+public static class ReferenceCountedWriter extends 
AbstractReferenceCounted {
+
+private final NamespaceName namespaceName;
+private final SystemTopicBaseTxnBufferSnapshotService service;
+private CompletableFuture> future;
+private final Backoff backoff;
+
+protected ReferenceCountedWriter(NamespaceName namespaceName,
+ 
SystemTopicBaseTxnBufferSnapshotService service) {
+this.namespaceName = namespaceName;
+this.service = service;
+this.backoff = new Backoff(1, TimeUnit.SECONDS, 3, 
TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
+initWriterFuture();
+}
+
+private void initWriterFuture() {
+this.future = 
service.getTransactionBufferSystemTopicClient(namespaceName).newWriterAsync();
+
this.future.thenRunAsync(this.backoff::reset).exceptionally(throwable -> {
+long delay = backoff.next();
+log.error("[{}] Failed to new transaction buffer system topic 
writer," +
+"try to re-create the writer in {} ms.", 
delay, namespaceName, throwable);
+service.scheduledExecutorService.schedule(
+SafeRun.safeRun(this::initWriterFuture), delay, 
TimeUnit.MILLISECONDS);
+return null;
+});
+}
+
+public CompletableFuture> 
getFuture() {
+if (future == null) {
+initWriterFuture();
+}
+return future;
+}
+
+@Override
+protected void deallocate() {
+ReferenceCountedWriter referenceCountedWriter = 
service.writerFutureMap.remove(namespaceName);
+if (referenceCountedWriter != null && 
referenceCountedWriter.getFuture() != null) {
+
service.pendingCloseWriterList.add(referenceCountedWriter.getFuture());
+service.closePendingCloseWriter();
+}
+}
+
+@Override
+public ReferenceCounted touch(Object o) {
+return this;
+}
+
+}
+
+public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client,
+   ScheduledExecutorService 
scheduledExecutorService) {
 this.namespaceEventsSystemTopicFactory = new 
NamespaceEventsSystemTopicFactory(client);
 this.clients = new ConcurrentHashMap<>();
+this.scheduledExecutorService = scheduledExecutorService;
+this.writerFutureMap = new ConcurrentHashMap<>();
+this.pendingClose

[GitHub] [pulsar-site] momo-jun commented on pull request #36: [feature][doc] Apply the new REST API topic to versioned docs

2022-04-03 Thread GitBox


momo-jun commented on PR #36:
URL: https://github.com/apache/pulsar-site/pull/36#issuecomment-1087050743

   > @momo-jun why not update docs earlier than 2.7.2? Don't we need to update 
docs for all versions otherwise users do not have restapi doc entrance?
   
   I understand the maintenance rule is that only the versioned docs released 
in the last year are maintained and updated. Is it correct?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] Jason918 commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

2022-04-03 Thread GitBox


Jason918 commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r841339916


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##
@@ -67,216 +67,144 @@
 
 protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, 
boolean authoritative,
Integer coordinatorId) {
-if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-if (coordinatorId != null) {
-
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-authoritative);
-TransactionMetadataStore transactionMetadataStore =
-
pulsar().getTransactionMetadataStoreService().getStores()
-
.get(TransactionCoordinatorID.get(coordinatorId));
-if (transactionMetadataStore == null) {
-asyncResponse.resume(new RestException(NOT_FOUND,
-"Transaction coordinator not found! coordinator id 
: " + coordinatorId));
+if (coordinatorId != null) {
+
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+authoritative);
+TransactionMetadataStore transactionMetadataStore =
+pulsar().getTransactionMetadataStoreService().getStores()
+.get(TransactionCoordinatorID.get(coordinatorId));
+if (transactionMetadataStore == null) {
+asyncResponse.resume(new RestException(NOT_FOUND,
+"Transaction coordinator not found! coordinator id : " 
+ coordinatorId));
+return;
+}
+
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+} else {
+
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+false, false).thenAccept(partitionMetadata -> {
+if (partitionMetadata.partitions == 0) {
+asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
+"Transaction coordinator not found"));
 return;
 }
-
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-} else {
-
getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-false, false).thenAccept(partitionMetadata -> {
-if (partitionMetadata.partitions == 0) {
-asyncResponse.resume(new 
RestException(Response.Status.NOT_FOUND,
-"Transaction coordinator not found"));
+List> 
transactionMetadataStoreInfoFutures =
+Lists.newArrayList();
+for (int i = 0; i < partitionMetadata.partitions; i++) {
+try {
+transactionMetadataStoreInfoFutures
+
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+} catch (PulsarServerException e) {
+asyncResponse.resume(new RestException(e));
 return;
 }
-List> 
transactionMetadataStoreInfoFutures =
-Lists.newArrayList();
-for (int i = 0; i < partitionMetadata.partitions; i++) {
+}
+Map stats = new 
HashMap<>();
+
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
 e) -> {
+if (e != null) {
+asyncResponse.resume(new RestException(e));
+return;
+}
+
+for (int i = 0; i < 
transactionMetadataStoreInfoFutures.size(); i++) {
 try {
-transactionMetadataStoreInfoFutures
-
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-} catch (PulsarServerException e) {
-asyncResponse.resume(new RestException(e));
+stats.put(i, 
transactionMetadataStoreInfoFutures.get(i).get());
+} catch (Exception exception) {
+asyncResponse.resume(new 
RestException(exception.getCause()));
 return;
 }
 }
-Map stats = new 
HashMap<>();
-
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result,
 e) -> {
-if (e != null) {
-asyncResponse.resume(new RestException(e));

[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #14985: Add a cache eviction policy:Evicting cache data by the slowest markDeletedPosition

2022-04-03 Thread GitBox


lordcheng10 commented on code in PR #14985:
URL: https://github.com/apache/pulsar/pull/14985#discussion_r84115


##
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##
@@ -291,6 +291,185 @@ public void acknowledge1() throws Exception {
 ledger.close();
 }
 
+@Test
+public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
+final CountDownLatch counter = new CountDownLatch(1);
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+config.setCacheEvictionByMarkDeletedPosition(true);
+factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
+.toNanos(3));
+factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
+@Override
+public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
+ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() 
{
+@Override
+public void openCursorComplete(ManagedCursor cursor, 
Object ctx) {
+ManagedLedger ledger = (ManagedLedger) ctx;
+String message1 = "test";
+ledger.asyncAddEntry(message1.getBytes(Encoding), new 
AddEntryCallback() {
+@Override
+public void addComplete(Position position, ByteBuf 
entryData, Object ctx) {
+@SuppressWarnings("unchecked")
+Pair pair = 
(Pair) ctx;
+ManagedLedger ledger = pair.getLeft();
+ManagedCursor cursor = pair.getRight();
+assertEquals(((ManagedLedgerImpl) 
ledger).getCacheSize(), message1.getBytes(Encoding).length);
+
+cursor.asyncReadEntries(1, new 
ReadEntriesCallback() {
+@Override
+public void 
readEntriesComplete(List entries, Object ctx) {
+ManagedCursor cursor = (ManagedCursor) 
ctx;
+assertEquals(entries.size(), 1);
+Entry entry = entries.get(0);
+final Position position = 
entry.getPosition();
+assertEquals(new 
String(entry.getDataAndRelease(), Encoding), message1);
+((ManagedLedgerImpl) 
ledger).doCacheEviction(
+System.nanoTime() - 
TimeUnit.MILLISECONDS.toNanos(3));
+assertEquals(((ManagedLedgerImpl) 
ledger).getCacheSize(), message1.getBytes(Encoding).length);
+
+log.debug("Mark-Deleting to position 
{}", position);
+cursor.asyncMarkDelete(position, new 
MarkDeleteCallback() {
+@Override
+public void 
markDeleteComplete(Object ctx) {
+log.debug("Mark delete 
complete");
+ManagedCursor cursor = 
(ManagedCursor) ctx;
+
assertFalse(cursor.hasMoreEntries());
+((ManagedLedgerImpl) 
ledger).doCacheEviction(
+System.nanoTime() - 
TimeUnit.MILLISECONDS.toNanos(3));
+
assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0);
+
+counter.countDown();
+}
+
+@Override
+public void 
markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+fail(exception.getMessage());
+}
+
+}, cursor);
+}
+
+@Override
+public void 
readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+fail(exception.getMessage());
+}

Review Comment:
   I call 'fail' directly locally, which can cause the test to fail.  
@eolivelli 
   I have tried the following ways:
   public void testCacheEvictionByMarkDeletedPosition() throws Throwable{
   fail("xxx");
   .
   }
   
   public void testCacheEvictionByMarkDeletedPosition() throws Throwable{
   .
   public void addComplete(Position position, ByteBuf entryData, Object c

[GitHub] [pulsar] kkoderok commented on a diff in pull request #13339: [Issue 10816][Proxy] Refresh client auth token

2022-04-03 Thread GitBox


kkoderok commented on code in PR #13339:
URL: https://github.com/apache/pulsar/pull/13339#discussion_r841390236


##
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java:
##
@@ -439,11 +451,65 @@ ClientConfigurationData createClientConfiguration() 
throws UnsupportedAuthentica
 private PulsarClientImpl createClient(final ClientConfigurationData 
clientConf, final AuthData clientAuthData,
 final String clientAuthMethod, final int protocolVersion) throws 
PulsarClientException {
 this.connectionPool = new ProxyConnectionPool(clientConf, 
service.getWorkerGroup(),
-() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), 
clientAuthRole, clientAuthData,
-clientAuthMethod, protocolVersion));
+() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), 
clientAuthRole,
+this::getOrRefreshClientAuthData, clientAuthMethod, 
protocolVersion,
+
service.getConfiguration().isForwardAuthorizationCredentials()));
 return new PulsarClientImpl(clientConf, service.getWorkerGroup(), 
connectionPool, service.getTimer());
 }
 
+private void updateClientAuthData(AuthData clientData) {
+this.clientAuthData = clientData;
+authFutureList.getAndSet(Collections.emptyList())
+.forEach(future -> future.complete(clientData));
+}
+
+private CompletableFuture getOrRefreshClientAuthData() {

Review Comment:
   When client auth data expired and proxy configured with forward 
authentication data, then this method requests a new token. This method is 
thread safe, because one ProxyConnection has multiple ProxyClientCnx, and 
potentially at same time, they request
   auth data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] kkoderok commented on a diff in pull request #13339: [Issue 10816][Proxy] Refresh client auth token

2022-04-03 Thread GitBox


kkoderok commented on code in PR #13339:
URL: https://github.com/apache/pulsar/pull/13339#discussion_r841398032


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##
@@ -240,16 +253,23 @@ public void channelActive(ChannelHandlerContext ctx) 
throws Exception {
 ctx.close();
 }
 });
+} else {
+log.warn("Error during handshake", th);
+ctx.close();
+}
+});
 }
 
-protected ByteBuf newConnectCommand() throws Exception {
+protected CompletableFuture newConnectCommand() throws Exception {
 // mutual authentication is to auth between `remoteHostName` and this 
client for this channel.
 // each channel will have a mutual client/server pair, mutual client 
evaluateChallenge with init data,
 // and return authData to server.
 authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
 AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-return Commands.newConnect(authentication.getAuthMethodName(), 
authData, this.protocolVersion,
-PulsarVersion.getVersion(), proxyToTargetBrokerAddress, null, 
null, null);
+return CompletableFuture.completedFuture(

Review Comment:
   This method is overridden in the ProxyClientCnx class, which can use the 
asynchronous operation of obtaining authorization data from the client, so this 
method return a CompletableFuture



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] kkoderok commented on a diff in pull request #13339: [Issue 10816][Proxy] Refresh client auth token

2022-04-03 Thread GitBox


kkoderok commented on code in PR #13339:
URL: https://github.com/apache/pulsar/pull/13339#discussion_r841398032


##
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##
@@ -240,16 +253,23 @@ public void channelActive(ChannelHandlerContext ctx) 
throws Exception {
 ctx.close();
 }
 });
+} else {
+log.warn("Error during handshake", th);
+ctx.close();
+}
+});
 }
 
-protected ByteBuf newConnectCommand() throws Exception {
+protected CompletableFuture newConnectCommand() throws Exception {
 // mutual authentication is to auth between `remoteHostName` and this 
client for this channel.
 // each channel will have a mutual client/server pair, mutual client 
evaluateChallenge with init data,
 // and return authData to server.
 authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
 AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-return Commands.newConnect(authentication.getAuthMethodName(), 
authData, this.protocolVersion,
-PulsarVersion.getVersion(), proxyToTargetBrokerAddress, null, 
null, null);
+return CompletableFuture.completedFuture(

Review Comment:
   This method is overridden in the ProxyClientCnx class, which can use the 
asynchronous operation of obtaining authorization data from the client, so this 
method returns a CompletableFuture



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] kkoderok commented on a diff in pull request #13339: [Issue 10816][Proxy] Refresh client auth token

2022-04-03 Thread GitBox


kkoderok commented on code in PR #13339:
URL: https://github.com/apache/pulsar/pull/13339#discussion_r841401389


##
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##
@@ -20,43 +20,93 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.protocol.Commands;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ProxyClientCnx extends ClientCnx {
 
-String clientAuthRole;
-AuthData clientAuthData;
-String clientAuthMethod;
-int protocolVersion;
+private String clientAuthRole;
+private String clientAuthMethod;
+private int protocolVersion;
+private boolean forwardAuthorizationCredentials;
+private Supplier> clientAuthDataSupplier;
 
 public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, String clientAuthRole,
-  AuthData clientAuthData, String clientAuthMethod, 
int protocolVersion) {
+  Supplier> 
clientAuthDataSupplier,
+  String clientAuthMethod, int protocolVersion, 
boolean forwardAuthorizationCredentials) {
 super(conf, eventLoopGroup);
 this.clientAuthRole = clientAuthRole;
-this.clientAuthData = clientAuthData;
 this.clientAuthMethod = clientAuthMethod;
 this.protocolVersion = protocolVersion;
+this.forwardAuthorizationCredentials = forwardAuthorizationCredentials;
+this.clientAuthDataSupplier = clientAuthDataSupplier;
 }
 
 @Override
-protected ByteBuf newConnectCommand() throws Exception {
-if (log.isDebugEnabled()) {
-log.debug("New Connection opened via ProxyClientCnx with params 
clientAuthRole = {},"
-+ " clientAuthData = {}, clientAuthMethod = {}",
+protected CompletableFuture newConnectCommand() throws Exception {
+authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
+AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
+
+return clientAuthDataSupplier.get().thenApply(clientAuthData -> {
+if (log.isDebugEnabled()) {
+log.debug("New Connection opened via ProxyClientCnx with 
params clientAuthRole = {},"
++ " clientAuthData = {}, clientAuthMethod = {}",
 clientAuthRole, clientAuthData, clientAuthMethod);
+}
+
+return Commands.newConnect(authentication.getAuthMethodName(), 
authData, this.protocolVersion,
+PulsarVersion.getVersion(), proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
+clientAuthMethod);
+});
+}
+
+@Override
+protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+boolean isRefresh = Arrays.equals(
+AuthData.REFRESH_AUTH_DATA_BYTES,
+authChallenge.getChallenge().getAuthData()
+);
+
+if (!forwardAuthorizationCredentials || !isRefresh) {
+super.handleAuthChallenge(authChallenge);
+return;
 }
 
-authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
-AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-return Commands.newConnect(authentication.getAuthMethodName(), 
authData, this.protocolVersion,
-PulsarVersion.getVersion(), proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
-clientAuthMethod);
+try {
+clientAuthDataSupplier.get()
+.thenAccept(authData -> sendAuthResponse(authData, 
clientAuthMethod));
+} catch (Exception e) {

Review Comment:
   Agree. I will fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [pulsar] kkoderok commented on a diff in pull request #13339: [Issue 10816][Proxy] Refresh client auth token

2022-04-03 Thread GitBox


kkoderok commented on code in PR #13339:
URL: https://github.com/apache/pulsar/pull/13339#discussion_r841402861


##
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java:
##
@@ -363,6 +374,72 @@ public void 
testProxyAuthorizationWithPrefixSubscriptionAuthMode() throws Except
 log.info("-- Exiting {} test --", methodName);
 }
 
+@Test
+void testRefreshClientToken() throws Exception {
+log.info("-- Starting {} test --", methodName);
+
+startProxy();
+createAdminClient();
+
+@SuppressWarnings("unchecked")
+Supplier tokenSupplier = Mockito.mock(Supplier.class);
+when(tokenSupplier.get()).thenAnswer(answer -> 
createClientJwtToken(Duration.ofSeconds(1)));
+
+PulsarClient proxyClient = PulsarClient.builder()
+.serviceUrl(proxyService.getServiceUrl()).statsInterval(0, 
TimeUnit.SECONDS)
+.authentication(AuthenticationFactory.token(tokenSupplier))
+.operationTimeout(1000, TimeUnit.MILLISECONDS)
+.build();
+
+String namespaceName = "my-property/proxy-authorization/my-ns";
+admin.clusters().createCluster("proxy-authorization", 
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+admin.tenants().createTenant("my-property",
+new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), 
Sets.newHashSet("proxy-authorization")));
+admin.namespaces().createNamespace(namespaceName);
+
+admin.namespaces().grantPermissionOnNamespace(namespaceName, 
CLIENT_ROLE,
+Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+log.info("-- Admin permissions {} ---", 
admin.namespaces().getPermissions(namespaceName));
+
+Producer producer = proxyClient.newProducer(Schema.BYTES)
+
.topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create();
+
+final int msgs = 10;
+for (int i = 0; i < msgs; i++) {
+String message = "my-message-" + i;
+producer.send(message.getBytes());
+}
+
+//noinspection unchecked
+clearInvocations(tokenSupplier);
+Thread.sleep(3000);
+verify(tokenSupplier, atLeastOnce()).get();

Review Comment:
   Ok. thanks. I'll check and fix the test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [VOTE] PIP-149: Making the REST Admin API fully async

2022-04-03 Thread mattison chao
Thanks for your participation.

Close this vote with 2 (+1) bindings and 3 (+1) non-bindings, 0 (-1).

On Wed, 30 Mar 2022 at 07:27, ZhangJian He  wrote:

> +1
>
> Thanks
> ZhangJian He
>
> Zike Yang  于2022年3月29日周二 18:48写道:
>
> > +1,
> >
> > Zike Yang
> >
> > On 2022/03/29 09:22:16 guo jiwei wrote:
> > > +1
> > >
> > > Regards
> > > Jiwei Guo (Tboy)
> > >
> > >
> > > On Tue, Mar 29, 2022 at 4:16 PM Enrico Olivelli 
> > wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > Enrico
> > > >
> > > > Il giorno mar 29 mar 2022 alle ore 09:55 PengHui Li
> > > >  ha scritto:
> > > > >
> > > > > +1,
> > > > >
> > > > > Penghui
> > > > >
> > > > > On Fri, Mar 25, 2022 at 10:13 AM mattison chao <
> > mattisonc...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > This is the voting thread for PIP-149. It will stay open for at
> > least
> > > > 48
> > > > > > hours.
> > > > > >
> > > > > > https://github.com/apache/pulsar/issues/14365
> > > > > >
> > > > > > Pasted below for quoting convenience.
> > > > > >
> > > > > > -
> > > > > >
> > > > > > Motivation
> > > > > >
> > > > > > The Rest API was originally designed to be implemented
> > asynchronously,
> > > > but
> > > > > > with the iteration of functions, some synchronous implementations
> > were
> > > > > > added, resulting in many asynchronous methods called synchronous
> > > > > > implementations. Also, many synchronous calls do not add
> timeouts.
> > This
> > > > > > greatly reduces concurrency, user operations, and experience.
> > > > > > In order to prevent more problems, and improve code readability
> and
> > > > > > maintainability, we intend to refactor these synchronous calls
> and
> > > > > > standardize the implementation of the API.
> > > > > >
> > > > > > Related discussion:
> > > > > > https://lists.apache.org/thread/pkkz2jgwtzpksp6d4rdm1pyxzb3z6vmg
> > > > > >
> > > > > > Goals
> > > > > >
> > > > > > Try to avoid synchronous method calls in asynchronous methods.
> > > > > > Async variable (AsyncResponse) is placed in the first parameter
> > > > position.
> > > > > > Async variable (AsyncResponse) cannot be substituted into method
> > > > > > implementations.
> > > > > > Add more tests and increase the coverage.
> > > > > > Modification
> > > > > > Avoid synchronous method calls in asynchronous methods.
> > > > > >
> > > > > > protected void internalDeleteNamespace(boolean authoritative) {
> > > > > >validateTenantOperation(namespaceName.getTenant(),
> > > > > > TenantOperation.DELETE_NAMESPACE);
> > > > > >validatePoliciesReadOnlyAccess();
> > > > > > }
> > > > > > Suggest to do like this:
> > > > > >
> > > > > > protected CompletableFuture internalDeleteNamespace(boolean
> > > > > > authoritative) {
> > > > > > return
> validateTenantOperationAsync(namespaceName.getTenant(),
> > > > > > TenantOperation.DELETE_NAMESPACE)
> > > > > >.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync());
> > > > > > }
> > > > > > Async variable (AsyncResponse) is placed in the first parameter
> > > > position
> > > > > >
> > > > > > public void deleteNamespace(@Suspended final AsyncResponse
> > > > asyncResponse,
> > > > > >  @PathParam("tenant") String tenant,
> > > > > >  @PathParam("namespace") String namespace,
> > > > > >  @QueryParam("force") @DefaultValue("false") boolean
> force,
> > > > > >  @QueryParam("authoritative") @DefaultValue("false")
> > boolean
> > > > > > authoritative) {
> > > > > >
> > > > > > Async variable (AsyncResponse) cannot be substituted into method
> > > > > > implementations
> > > > > >
> > > > > > internalCreateNonPartitionedTopicAsync(asyncResponse,
> > authoritative,
> > > > > > properties);
> > > > > > Suggest to do like this:
> > > > > >
> > > > > > internalCreateNonPartitionedTopicAsync(authoritative, properties)
> > > > > > .thenAccept(__ ->
> > > > > > asyncResponse.resume(Response.noContent().build()))
> > > > > > .exceptionally(ex -> {
> > > > > > resumeAsyncResponseExceptionally(asyncResponse,
> > > > ex.getCause());
> > > > > > return null;
> > > > > > });
> > > > > >
> > > > > > Task tracking
> > > > > > In order to unify the modification and track the modified part,
> > it's
> > > > better
> > > > > > to open an issue to track, like #14353, #14013, #13854.
> > > > > >
> > > > > > ---
> > > > > > Best,
> > > > > > Mattison
> > > > > >
> > > >
> > >
> >
>