[GitHub] [pulsar-dotpulsar] VisualBean commented on issue #101: `AuthenticateUsingToken(Func> tokenFactory)` is not a part of 2.3.0
VisualBean commented on issue #101: URL: https://github.com/apache/pulsar-dotpulsar/issues/101#issuecomment-1086892520 I never found it, but ended up overriding iauthentication instead - which worked great for OAuth using MSAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15015: [enh][transaction] Optimize to reuse transaction buffer snapshot writer
Technoboy- commented on code in PR #15015: URL: https://github.com/apache/pulsar/pull/15015#discussion_r841313610 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java: ## @@ -18,69 +18,168 @@ */ package org.apache.pulsar.broker.service; +import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCounted; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader; import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer; import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient; import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException; -import org.apache.pulsar.common.events.EventType; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.Backoff; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; +@Slf4j public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService { -private final Map> clients; +private final Map> clients; private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory; -public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) { +private final ScheduledExecutorService scheduledExecutorService; +private final ConcurrentHashMap writerFutureMap; +private final LinkedList>> pendingCloseWriterList; + +// The class ReferenceCountedWriter will maintain the reference count, +// when the reference count decrement to 0, it will be removed from writerFutureMap, the writer will be closed. +public static class ReferenceCountedWriter extends AbstractReferenceCounted { + +private final NamespaceName namespaceName; +private final SystemTopicBaseTxnBufferSnapshotService service; +private CompletableFuture> future; +private final Backoff backoff; + +protected ReferenceCountedWriter(NamespaceName namespaceName, + SystemTopicBaseTxnBufferSnapshotService service) { +this.namespaceName = namespaceName; +this.service = service; +this.backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS); +initWriterFuture(); +} + +private void initWriterFuture() { +this.future = service.getTransactionBufferSystemTopicClient(namespaceName).newWriterAsync(); + this.future.thenRunAsync(this.backoff::reset).exceptionally(throwable -> { +long delay = backoff.next(); +log.error("[{}] Failed to new transaction buffer system topic writer," + +"try to re-create the writer in {} ms.", delay, namespaceName, throwable); +service.scheduledExecutorService.schedule( +SafeRun.safeRun(this::initWriterFuture), delay, TimeUnit.MILLISECONDS); +return null; +}); +} + +public CompletableFuture> getFuture() { +if (future == null) { +initWriterFuture(); +} +return future; +} + +@Override +protected void deallocate() { +ReferenceCountedWriter referenceCountedWriter = service.writerFutureMap.remove(namespaceName); +if (referenceCountedWriter != null && referenceCountedWriter.getFuture() != null) { + service.pendingCloseWriterList.add(referenceCountedWriter.getFuture()); +service.closePendingCloseWriter(); +} +} + +@Override +public ReferenceCounted touch(Object o) { +return this; +} + +} + +public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client, + ScheduledExecutorService scheduledExecutorService) { this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client); this.clients = new ConcurrentHashMap<>(); +this.scheduledExecutorService = scheduledExecutorService; +this.writerFutureMap = new ConcurrentHashMap<>(); +this.pendingClose
[GitHub] [pulsar-site] momo-jun commented on pull request #36: [feature][doc] Apply the new REST API topic to versioned docs
momo-jun commented on PR #36: URL: https://github.com/apache/pulsar-site/pull/36#issuecomment-1087050743 > @momo-jun why not update docs earlier than 2.7.2? Don't we need to update docs for all versions otherwise users do not have restapi doc entrance? I understand the maintenance rule is that only the versioned docs released in the last year are maintained and updated. Is it correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] Jason918 commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.
Jason918 commented on code in PR #15017: URL: https://github.com/apache/pulsar/pull/15017#discussion_r841339916 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java: ## @@ -67,216 +67,144 @@ protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative, Integer coordinatorId) { -if (pulsar().getConfig().isTransactionCoordinatorEnabled()) { -if (coordinatorId != null) { - validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId), -authoritative); -TransactionMetadataStore transactionMetadataStore = - pulsar().getTransactionMetadataStoreService().getStores() - .get(TransactionCoordinatorID.get(coordinatorId)); -if (transactionMetadataStore == null) { -asyncResponse.resume(new RestException(NOT_FOUND, -"Transaction coordinator not found! coordinator id : " + coordinatorId)); +if (coordinatorId != null) { + validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId), +authoritative); +TransactionMetadataStore transactionMetadataStore = +pulsar().getTransactionMetadataStoreService().getStores() +.get(TransactionCoordinatorID.get(coordinatorId)); +if (transactionMetadataStore == null) { +asyncResponse.resume(new RestException(NOT_FOUND, +"Transaction coordinator not found! coordinator id : " + coordinatorId)); +return; +} + asyncResponse.resume(transactionMetadataStore.getCoordinatorStats()); +} else { + getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN, +false, false).thenAccept(partitionMetadata -> { +if (partitionMetadata.partitions == 0) { +asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, +"Transaction coordinator not found")); return; } - asyncResponse.resume(transactionMetadataStore.getCoordinatorStats()); -} else { - getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN, -false, false).thenAccept(partitionMetadata -> { -if (partitionMetadata.partitions == 0) { -asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, -"Transaction coordinator not found")); +List> transactionMetadataStoreInfoFutures = +Lists.newArrayList(); +for (int i = 0; i < partitionMetadata.partitions; i++) { +try { +transactionMetadataStoreInfoFutures + .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i)); +} catch (PulsarServerException e) { +asyncResponse.resume(new RestException(e)); return; } -List> transactionMetadataStoreInfoFutures = -Lists.newArrayList(); -for (int i = 0; i < partitionMetadata.partitions; i++) { +} +Map stats = new HashMap<>(); + FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> { +if (e != null) { +asyncResponse.resume(new RestException(e)); +return; +} + +for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) { try { -transactionMetadataStoreInfoFutures - .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i)); -} catch (PulsarServerException e) { -asyncResponse.resume(new RestException(e)); +stats.put(i, transactionMetadataStoreInfoFutures.get(i).get()); +} catch (Exception exception) { +asyncResponse.resume(new RestException(exception.getCause())); return; } } -Map stats = new HashMap<>(); - FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> { -if (e != null) { -asyncResponse.resume(new RestException(e));
[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #14985: Add a cache eviction policy:Evicting cache data by the slowest markDeletedPosition
lordcheng10 commented on code in PR #14985: URL: https://github.com/apache/pulsar/pull/14985#discussion_r84115 ## managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java: ## @@ -291,6 +291,185 @@ public void acknowledge1() throws Exception { ledger.close(); } +@Test +public void testCacheEvictionByMarkDeletedPosition() throws Throwable { +final CountDownLatch counter = new CountDownLatch(1); +ManagedLedgerConfig config = new ManagedLedgerConfig(); +config.setCacheEvictionByMarkDeletedPosition(true); +factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS +.toNanos(3)); +factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { +@Override +public void openLedgerComplete(ManagedLedger ledger, Object ctx) { +ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() { +@Override +public void openCursorComplete(ManagedCursor cursor, Object ctx) { +ManagedLedger ledger = (ManagedLedger) ctx; +String message1 = "test"; +ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() { +@Override +public void addComplete(Position position, ByteBuf entryData, Object ctx) { +@SuppressWarnings("unchecked") +Pair pair = (Pair) ctx; +ManagedLedger ledger = pair.getLeft(); +ManagedCursor cursor = pair.getRight(); +assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length); + +cursor.asyncReadEntries(1, new ReadEntriesCallback() { +@Override +public void readEntriesComplete(List entries, Object ctx) { +ManagedCursor cursor = (ManagedCursor) ctx; +assertEquals(entries.size(), 1); +Entry entry = entries.get(0); +final Position position = entry.getPosition(); +assertEquals(new String(entry.getDataAndRelease(), Encoding), message1); +((ManagedLedgerImpl) ledger).doCacheEviction( +System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(3)); +assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length); + +log.debug("Mark-Deleting to position {}", position); +cursor.asyncMarkDelete(position, new MarkDeleteCallback() { +@Override +public void markDeleteComplete(Object ctx) { +log.debug("Mark delete complete"); +ManagedCursor cursor = (ManagedCursor) ctx; + assertFalse(cursor.hasMoreEntries()); +((ManagedLedgerImpl) ledger).doCacheEviction( +System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(3)); + assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0); + +counter.countDown(); +} + +@Override +public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { +fail(exception.getMessage()); +} + +}, cursor); +} + +@Override +public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { +fail(exception.getMessage()); +} Review Comment: I call 'fail' directly locally, which can cause the test to fail. @eolivelli I have tried the following ways: public void testCacheEvictionByMarkDeletedPosition() throws Throwable{ fail("xxx"); . } public void testCacheEvictionByMarkDeletedPosition() throws Throwable{ . public void addComplete(Position position, ByteBuf entryData, Object c
[GitHub] [pulsar] kkoderok commented on a diff in pull request #13339: [Issue 10816][Proxy] Refresh client auth token
kkoderok commented on code in PR #13339: URL: https://github.com/apache/pulsar/pull/13339#discussion_r841390236 ## pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java: ## @@ -439,11 +451,65 @@ ClientConfigurationData createClientConfiguration() throws UnsupportedAuthentica private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData, final String clientAuthMethod, final int protocolVersion) throws PulsarClientException { this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(), -() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData, -clientAuthMethod, protocolVersion)); +() -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, +this::getOrRefreshClientAuthData, clientAuthMethod, protocolVersion, + service.getConfiguration().isForwardAuthorizationCredentials())); return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer()); } +private void updateClientAuthData(AuthData clientData) { +this.clientAuthData = clientData; +authFutureList.getAndSet(Collections.emptyList()) +.forEach(future -> future.complete(clientData)); +} + +private CompletableFuture getOrRefreshClientAuthData() { Review Comment: When client auth data expired and proxy configured with forward authentication data, then this method requests a new token. This method is thread safe, because one ProxyConnection has multiple ProxyClientCnx, and potentially at same time, they request auth data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] kkoderok commented on a diff in pull request #13339: [Issue 10816][Proxy] Refresh client auth token
kkoderok commented on code in PR #13339: URL: https://github.com/apache/pulsar/pull/13339#discussion_r841398032 ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java: ## @@ -240,16 +253,23 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.close(); } }); +} else { +log.warn("Error during handshake", th); +ctx.close(); +} +}); } -protected ByteBuf newConnectCommand() throws Exception { +protected CompletableFuture newConnectCommand() throws Exception { // mutual authentication is to auth between `remoteHostName` and this client for this channel. // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data, // and return authData to server. authenticationDataProvider = authentication.getAuthData(remoteHostName); AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); -return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, -PulsarVersion.getVersion(), proxyToTargetBrokerAddress, null, null, null); +return CompletableFuture.completedFuture( Review Comment: This method is overridden in the ProxyClientCnx class, which can use the asynchronous operation of obtaining authorization data from the client, so this method return a CompletableFuture -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] kkoderok commented on a diff in pull request #13339: [Issue 10816][Proxy] Refresh client auth token
kkoderok commented on code in PR #13339: URL: https://github.com/apache/pulsar/pull/13339#discussion_r841398032 ## pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java: ## @@ -240,16 +253,23 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.close(); } }); +} else { +log.warn("Error during handshake", th); +ctx.close(); +} +}); } -protected ByteBuf newConnectCommand() throws Exception { +protected CompletableFuture newConnectCommand() throws Exception { // mutual authentication is to auth between `remoteHostName` and this client for this channel. // each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data, // and return authData to server. authenticationDataProvider = authentication.getAuthData(remoteHostName); AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); -return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, -PulsarVersion.getVersion(), proxyToTargetBrokerAddress, null, null, null); +return CompletableFuture.completedFuture( Review Comment: This method is overridden in the ProxyClientCnx class, which can use the asynchronous operation of obtaining authorization data from the client, so this method returns a CompletableFuture -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] kkoderok commented on a diff in pull request #13339: [Issue 10816][Proxy] Refresh client auth token
kkoderok commented on code in PR #13339: URL: https://github.com/apache/pulsar/pull/13339#discussion_r841401389 ## pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java: ## @@ -20,43 +20,93 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.AuthData; +import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.protocol.Commands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ProxyClientCnx extends ClientCnx { -String clientAuthRole; -AuthData clientAuthData; -String clientAuthMethod; -int protocolVersion; +private String clientAuthRole; +private String clientAuthMethod; +private int protocolVersion; +private boolean forwardAuthorizationCredentials; +private Supplier> clientAuthDataSupplier; public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole, - AuthData clientAuthData, String clientAuthMethod, int protocolVersion) { + Supplier> clientAuthDataSupplier, + String clientAuthMethod, int protocolVersion, boolean forwardAuthorizationCredentials) { super(conf, eventLoopGroup); this.clientAuthRole = clientAuthRole; -this.clientAuthData = clientAuthData; this.clientAuthMethod = clientAuthMethod; this.protocolVersion = protocolVersion; +this.forwardAuthorizationCredentials = forwardAuthorizationCredentials; +this.clientAuthDataSupplier = clientAuthDataSupplier; } @Override -protected ByteBuf newConnectCommand() throws Exception { -if (log.isDebugEnabled()) { -log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {}," -+ " clientAuthData = {}, clientAuthMethod = {}", +protected CompletableFuture newConnectCommand() throws Exception { +authenticationDataProvider = authentication.getAuthData(remoteHostName); +AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); + +return clientAuthDataSupplier.get().thenApply(clientAuthData -> { +if (log.isDebugEnabled()) { +log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {}," ++ " clientAuthData = {}, clientAuthMethod = {}", clientAuthRole, clientAuthData, clientAuthMethod); +} + +return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, +PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, +clientAuthMethod); +}); +} + +@Override +protected void handleAuthChallenge(CommandAuthChallenge authChallenge) { +boolean isRefresh = Arrays.equals( +AuthData.REFRESH_AUTH_DATA_BYTES, +authChallenge.getChallenge().getAuthData() +); + +if (!forwardAuthorizationCredentials || !isRefresh) { +super.handleAuthChallenge(authChallenge); +return; } -authenticationDataProvider = authentication.getAuthData(remoteHostName); -AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); -return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, -PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, -clientAuthMethod); +try { +clientAuthDataSupplier.get() +.thenAccept(authData -> sendAuthResponse(authData, clientAuthMethod)); +} catch (Exception e) { Review Comment: Agree. I will fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar] kkoderok commented on a diff in pull request #13339: [Issue 10816][Proxy] Refresh client auth token
kkoderok commented on code in PR #13339: URL: https://github.com/apache/pulsar/pull/13339#discussion_r841402861 ## pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyWithJwtAuthorizationTest.java: ## @@ -363,6 +374,72 @@ public void testProxyAuthorizationWithPrefixSubscriptionAuthMode() throws Except log.info("-- Exiting {} test --", methodName); } +@Test +void testRefreshClientToken() throws Exception { +log.info("-- Starting {} test --", methodName); + +startProxy(); +createAdminClient(); + +@SuppressWarnings("unchecked") +Supplier tokenSupplier = Mockito.mock(Supplier.class); +when(tokenSupplier.get()).thenAnswer(answer -> createClientJwtToken(Duration.ofSeconds(1))); + +PulsarClient proxyClient = PulsarClient.builder() +.serviceUrl(proxyService.getServiceUrl()).statsInterval(0, TimeUnit.SECONDS) +.authentication(AuthenticationFactory.token(tokenSupplier)) +.operationTimeout(1000, TimeUnit.MILLISECONDS) +.build(); + +String namespaceName = "my-property/proxy-authorization/my-ns"; +admin.clusters().createCluster("proxy-authorization", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); +admin.tenants().createTenant("my-property", +new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("proxy-authorization"))); +admin.namespaces().createNamespace(namespaceName); + +admin.namespaces().grantPermissionOnNamespace(namespaceName, CLIENT_ROLE, +Sets.newHashSet(AuthAction.consume, AuthAction.produce)); +log.info("-- Admin permissions {} ---", admin.namespaces().getPermissions(namespaceName)); + +Producer producer = proxyClient.newProducer(Schema.BYTES) + .topic("persistent://my-property/proxy-authorization/my-ns/my-topic1").create(); + +final int msgs = 10; +for (int i = 0; i < msgs; i++) { +String message = "my-message-" + i; +producer.send(message.getBytes()); +} + +//noinspection unchecked +clearInvocations(tokenSupplier); +Thread.sleep(3000); +verify(tokenSupplier, atLeastOnce()).get(); Review Comment: Ok. thanks. I'll check and fix the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [VOTE] PIP-149: Making the REST Admin API fully async
Thanks for your participation. Close this vote with 2 (+1) bindings and 3 (+1) non-bindings, 0 (-1). On Wed, 30 Mar 2022 at 07:27, ZhangJian He wrote: > +1 > > Thanks > ZhangJian He > > Zike Yang 于2022年3月29日周二 18:48写道: > > > +1, > > > > Zike Yang > > > > On 2022/03/29 09:22:16 guo jiwei wrote: > > > +1 > > > > > > Regards > > > Jiwei Guo (Tboy) > > > > > > > > > On Tue, Mar 29, 2022 at 4:16 PM Enrico Olivelli > > wrote: > > > > > > > +1 (binding) > > > > > > > > Enrico > > > > > > > > Il giorno mar 29 mar 2022 alle ore 09:55 PengHui Li > > > > ha scritto: > > > > > > > > > > +1, > > > > > > > > > > Penghui > > > > > > > > > > On Fri, Mar 25, 2022 at 10:13 AM mattison chao < > > mattisonc...@gmail.com> > > > > > wrote: > > > > > > > > > > > This is the voting thread for PIP-149. It will stay open for at > > least > > > > 48 > > > > > > hours. > > > > > > > > > > > > https://github.com/apache/pulsar/issues/14365 > > > > > > > > > > > > Pasted below for quoting convenience. > > > > > > > > > > > > - > > > > > > > > > > > > Motivation > > > > > > > > > > > > The Rest API was originally designed to be implemented > > asynchronously, > > > > but > > > > > > with the iteration of functions, some synchronous implementations > > were > > > > > > added, resulting in many asynchronous methods called synchronous > > > > > > implementations. Also, many synchronous calls do not add > timeouts. > > This > > > > > > greatly reduces concurrency, user operations, and experience. > > > > > > In order to prevent more problems, and improve code readability > and > > > > > > maintainability, we intend to refactor these synchronous calls > and > > > > > > standardize the implementation of the API. > > > > > > > > > > > > Related discussion: > > > > > > https://lists.apache.org/thread/pkkz2jgwtzpksp6d4rdm1pyxzb3z6vmg > > > > > > > > > > > > Goals > > > > > > > > > > > > Try to avoid synchronous method calls in asynchronous methods. > > > > > > Async variable (AsyncResponse) is placed in the first parameter > > > > position. > > > > > > Async variable (AsyncResponse) cannot be substituted into method > > > > > > implementations. > > > > > > Add more tests and increase the coverage. > > > > > > Modification > > > > > > Avoid synchronous method calls in asynchronous methods. > > > > > > > > > > > > protected void internalDeleteNamespace(boolean authoritative) { > > > > > >validateTenantOperation(namespaceName.getTenant(), > > > > > > TenantOperation.DELETE_NAMESPACE); > > > > > >validatePoliciesReadOnlyAccess(); > > > > > > } > > > > > > Suggest to do like this: > > > > > > > > > > > > protected CompletableFuture internalDeleteNamespace(boolean > > > > > > authoritative) { > > > > > > return > validateTenantOperationAsync(namespaceName.getTenant(), > > > > > > TenantOperation.DELETE_NAMESPACE) > > > > > >.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()); > > > > > > } > > > > > > Async variable (AsyncResponse) is placed in the first parameter > > > > position > > > > > > > > > > > > public void deleteNamespace(@Suspended final AsyncResponse > > > > asyncResponse, > > > > > > @PathParam("tenant") String tenant, > > > > > > @PathParam("namespace") String namespace, > > > > > > @QueryParam("force") @DefaultValue("false") boolean > force, > > > > > > @QueryParam("authoritative") @DefaultValue("false") > > boolean > > > > > > authoritative) { > > > > > > > > > > > > Async variable (AsyncResponse) cannot be substituted into method > > > > > > implementations > > > > > > > > > > > > internalCreateNonPartitionedTopicAsync(asyncResponse, > > authoritative, > > > > > > properties); > > > > > > Suggest to do like this: > > > > > > > > > > > > internalCreateNonPartitionedTopicAsync(authoritative, properties) > > > > > > .thenAccept(__ -> > > > > > > asyncResponse.resume(Response.noContent().build())) > > > > > > .exceptionally(ex -> { > > > > > > resumeAsyncResponseExceptionally(asyncResponse, > > > > ex.getCause()); > > > > > > return null; > > > > > > }); > > > > > > > > > > > > Task tracking > > > > > > In order to unify the modification and track the modified part, > > it's > > > > better > > > > > > to open an issue to track, like #14353, #14013, #13854. > > > > > > > > > > > > --- > > > > > > Best, > > > > > > Mattison > > > > > > > > > > > > > > > >