kkoderok commented on code in PR #13339: URL: https://github.com/apache/pulsar/pull/13339#discussion_r841401389
########## pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java: ########## @@ -20,43 +20,93 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.AuthData; +import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.protocol.Commands; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ProxyClientCnx extends ClientCnx { - String clientAuthRole; - AuthData clientAuthData; - String clientAuthMethod; - int protocolVersion; + private String clientAuthRole; + private String clientAuthMethod; + private int protocolVersion; + private boolean forwardAuthorizationCredentials; + private Supplier<CompletableFuture<AuthData>> clientAuthDataSupplier; public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, String clientAuthRole, - AuthData clientAuthData, String clientAuthMethod, int protocolVersion) { + Supplier<CompletableFuture<AuthData>> clientAuthDataSupplier, + String clientAuthMethod, int protocolVersion, boolean forwardAuthorizationCredentials) { super(conf, eventLoopGroup); this.clientAuthRole = clientAuthRole; - this.clientAuthData = clientAuthData; this.clientAuthMethod = clientAuthMethod; this.protocolVersion = protocolVersion; + this.forwardAuthorizationCredentials = forwardAuthorizationCredentials; + this.clientAuthDataSupplier = clientAuthDataSupplier; } @Override - protected ByteBuf newConnectCommand() throws Exception { - if (log.isDebugEnabled()) { - log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {}," - + " clientAuthData = {}, clientAuthMethod = {}", + protected CompletableFuture<ByteBuf> newConnectCommand() throws Exception { + authenticationDataProvider = authentication.getAuthData(remoteHostName); + AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); + + return clientAuthDataSupplier.get().thenApply(clientAuthData -> { + if (log.isDebugEnabled()) { + log.debug("New Connection opened via ProxyClientCnx with params clientAuthRole = {}," + + " clientAuthData = {}, clientAuthMethod = {}", clientAuthRole, clientAuthData, clientAuthMethod); + } + + return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, + PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, + clientAuthMethod); + }); + } + + @Override + protected void handleAuthChallenge(CommandAuthChallenge authChallenge) { + boolean isRefresh = Arrays.equals( + AuthData.REFRESH_AUTH_DATA_BYTES, + authChallenge.getChallenge().getAuthData() + ); + + if (!forwardAuthorizationCredentials || !isRefresh) { + super.handleAuthChallenge(authChallenge); + return; } - authenticationDataProvider = authentication.getAuthData(remoteHostName); - AuthData authData = authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA); - return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion, - PulsarVersion.getVersion(), proxyToTargetBrokerAddress, clientAuthRole, clientAuthData, - clientAuthMethod); + try { + clientAuthDataSupplier.get() + .thenAccept(authData -> sendAuthResponse(authData, clientAuthMethod)); + } catch (Exception e) { Review Comment: Agree. I will fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org