This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c3c55a7e848 [fix][ws] Fix WebSocket proxy originalPrincipal for HTTP
admin API calls (#24613)
c3c55a7e848 is described below
commit c3c55a7e84845f2749e781b485f385f7e6be4983
Author: Penghui Li <[email protected]>
AuthorDate: Fri Aug 8 10:31:42 2025 -0700
[fix][ws] Fix WebSocket proxy originalPrincipal for HTTP admin API calls
(#24613)
Co-authored-by: Claude <[email protected]>
---
...IntegrationTest.java => ProxyRoleAuthTest.java} | 27 ++++++++-------
.../proxy/ProxyRoleAuthWebServiceURLTest.java | 38 ++++++++++++++++++++++
.../org/apache/pulsar/client/impl/HttpClient.java | 8 +++++
3 files changed, 62 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthTest.java
similarity index 98%
rename from
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java
rename to
pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthTest.java
index 624c2522822..595991c40aa 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/WebSocketProxyAuthIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthTest.java
@@ -68,9 +68,9 @@ import org.testng.annotations.Test;
* 2. testWebSocketProxyWithUnauthorizedToken: Negative test with unauthorized
tokens
*/
@Test(groups = "websocket")
-public class WebSocketProxyAuthIntegrationTest extends ProducerConsumerBase {
+public class ProxyRoleAuthTest extends ProducerConsumerBase {
- private static final Logger log =
LoggerFactory.getLogger(WebSocketProxyAuthIntegrationTest.class);
+ private static final Logger log =
LoggerFactory.getLogger(ProxyRoleAuthTest.class);
// JWT token authentication setup with different roles
private static final SecretKey SECRET_KEY =
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
@@ -129,6 +129,19 @@ public class WebSocketProxyAuthIntegrationTest extends
ProducerConsumerBase {
// Setup namespace and grant permissions for client role
setupNamespacePermissions();
+ WebSocketProxyConfiguration proxyConfig = getProxyConfig();
+
+ service = spyWithClassAndConstructorArgs(WebSocketService.class,
proxyConfig);
+ doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(service)
+ .createConfigMetadataStore(anyString(), anyInt(),
anyBoolean());
+
+ proxyServer = new ProxyServer(proxyConfig);
+ WebSocketServiceStarter.start(proxyServer, service);
+
+ log.info("WebSocket Proxy Server started on port: {}",
proxyServer.getListenPortHTTP().get());
+ }
+
+ protected WebSocketProxyConfiguration getProxyConfig() {
// Create WebSocket proxy configuration with authentication and
authorization enabled
WebSocketProxyConfiguration proxyConfig = new
WebSocketProxyConfiguration();
proxyConfig.setWebServicePort(Optional.of(0));
@@ -152,15 +165,7 @@ public class WebSocketProxyAuthIntegrationTest extends
ProducerConsumerBase {
// Set broker service URL to connect to our test broker
proxyConfig.setBrokerServiceUrl(pulsar.getBrokerServiceUrl());
proxyConfig.setBrokerServiceUrlTls(pulsar.getBrokerServiceUrlTls());
-
- service = spyWithClassAndConstructorArgs(WebSocketService.class,
proxyConfig);
- doReturn(registerCloseable(new
ZKMetadataStore(mockZooKeeperGlobal))).when(service)
- .createConfigMetadataStore(anyString(), anyInt(),
anyBoolean());
-
- proxyServer = new ProxyServer(proxyConfig);
- WebSocketServiceStarter.start(proxyServer, service);
-
- log.info("WebSocket Proxy Server started on port: {}",
proxyServer.getListenPortHTTP().get());
+ return proxyConfig;
}
@AfterMethod(alwaysRun = true)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthWebServiceURLTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthWebServiceURLTest.java
new file mode 100644
index 00000000000..f726178aabc
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyRoleAuthWebServiceURLTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.websocket.proxy;
+
+import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
+
+/**
+ * Same test with ProxyRoleAuthTest but using REST API as the internal client.
+ */
+public class ProxyRoleAuthWebServiceURLTest extends ProxyRoleAuthTest {
+
+ @Override
+ protected WebSocketProxyConfiguration getProxyConfig() {
+ // Create WebSocket proxy configuration with authentication and
authorization enabled
+ WebSocketProxyConfiguration proxyConfig = super.getProxyConfig();
+ proxyConfig.setServiceUrl(pulsar.getWebServiceAddress());
+ proxyConfig.setServiceUrlTls(pulsar.getWebServiceAddressTls());
+ proxyConfig.setBrokerServiceUrl(null);
+ proxyConfig.setBrokerServiceUrlTls(null);
+ return proxyConfig;
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index a86e820af6b..f9312be39aa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -59,17 +59,20 @@ import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
@Slf4j
public class HttpClient implements Closeable {
+ private static final String ORIGINAL_PRINCIPAL_HEADER =
"X-Original-Principal";
protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
protected final AsyncHttpClient httpClient;
protected final ServiceNameResolver serviceNameResolver;
protected final Authentication authentication;
+ protected final ClientConfigurationData clientConf;
protected ScheduledExecutorService executorService;
protected PulsarSslFactory sslFactory;
protected HttpClient(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) throws PulsarClientException {
this.authentication = conf.getAuthentication();
+ this.clientConf = conf;
this.serviceNameResolver = new
PulsarServiceNameResolver(conf.getServiceUrlQuarantineInitDurationMs(),
conf.getServiceUrlQuarantineMaxDurationMs());
this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
@@ -194,6 +197,11 @@ public class HttpClient implements Closeable {
}
}
+ // Add X-Original-Principal header if originalPrincipal is
configured (for proxy scenarios)
+ if (clientConf.getOriginalPrincipal() != null) {
+ builder.addHeader(ORIGINAL_PRINCIPAL_HEADER,
clientConf.getOriginalPrincipal());
+ }
+
builder.execute().toCompletableFuture().whenComplete((response2, t) -> {
if (t != null) {
serviceNameResolver.markHostAvailability(