kkoderok commented on code in PR #13339:
URL: https://github.com/apache/pulsar/pull/13339#discussion_r841401389


##########
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyClientCnx.java:
##########
@@ -20,43 +20,93 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.EventLoopGroup;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.protocol.Commands;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ProxyClientCnx extends ClientCnx {
 
-    String clientAuthRole;
-    AuthData clientAuthData;
-    String clientAuthMethod;
-    int protocolVersion;
+    private String clientAuthRole;
+    private String clientAuthMethod;
+    private int protocolVersion;
+    private boolean forwardAuthorizationCredentials;
+    private Supplier<CompletableFuture<AuthData>> clientAuthDataSupplier;
 
     public ProxyClientCnx(ClientConfigurationData conf, EventLoopGroup 
eventLoopGroup, String clientAuthRole,
-                          AuthData clientAuthData, String clientAuthMethod, 
int protocolVersion) {
+                          Supplier<CompletableFuture<AuthData>> 
clientAuthDataSupplier,
+                          String clientAuthMethod, int protocolVersion, 
boolean forwardAuthorizationCredentials) {
         super(conf, eventLoopGroup);
         this.clientAuthRole = clientAuthRole;
-        this.clientAuthData = clientAuthData;
         this.clientAuthMethod = clientAuthMethod;
         this.protocolVersion = protocolVersion;
+        this.forwardAuthorizationCredentials = forwardAuthorizationCredentials;
+        this.clientAuthDataSupplier = clientAuthDataSupplier;
     }
 
     @Override
-    protected ByteBuf newConnectCommand() throws Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("New Connection opened via ProxyClientCnx with params 
clientAuthRole = {},"
-                            + " clientAuthData = {}, clientAuthMethod = {}",
+    protected CompletableFuture<ByteBuf> newConnectCommand() throws Exception {
+        authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
+        AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
+
+        return clientAuthDataSupplier.get().thenApply(clientAuthData -> {
+            if (log.isDebugEnabled()) {
+                log.debug("New Connection opened via ProxyClientCnx with 
params clientAuthRole = {},"
+                        + " clientAuthData = {}, clientAuthMethod = {}",
                     clientAuthRole, clientAuthData, clientAuthMethod);
+            }
+
+            return Commands.newConnect(authentication.getAuthMethodName(), 
authData, this.protocolVersion,
+                PulsarVersion.getVersion(), proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
+                clientAuthMethod);
+        });
+    }
+
+    @Override
+    protected void handleAuthChallenge(CommandAuthChallenge authChallenge) {
+        boolean isRefresh = Arrays.equals(
+            AuthData.REFRESH_AUTH_DATA_BYTES,
+            authChallenge.getChallenge().getAuthData()
+        );
+
+        if (!forwardAuthorizationCredentials || !isRefresh) {
+            super.handleAuthChallenge(authChallenge);
+            return;
         }
 
-        authenticationDataProvider = 
authentication.getAuthData(remoteHostName);
-        AuthData authData = 
authenticationDataProvider.authenticate(AuthData.INIT_AUTH_DATA);
-        return Commands.newConnect(authentication.getAuthMethodName(), 
authData, this.protocolVersion,
-            PulsarVersion.getVersion(), proxyToTargetBrokerAddress, 
clientAuthRole, clientAuthData,
-            clientAuthMethod);
+        try {
+            clientAuthDataSupplier.get()
+                .thenAccept(authData -> sendAuthResponse(authData, 
clientAuthMethod));
+        } catch (Exception e) {

Review Comment:
   Agree. I will fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to