Copilot commented on code in PR #17835:
URL: https://github.com/apache/pinot/pull/17835#discussion_r2902933999
##########
pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java:
##########
@@ -63,13 +67,73 @@ public GrpcConnection(Properties properties, List<String>
brokerList) {
}
public GrpcConnection(Properties properties, BrokerSelector brokerSelector) {
+ this(properties, brokerSelector,
ConnectionUtils.getHeadersFromProperties(properties));
+ }
+
+ GrpcConnection(Properties properties, BrokerSelector brokerSelector,
Map<String, String> defaultMetadata) {
+ this(properties, brokerSelector, createGrpcQueryClient(properties),
defaultMetadata);
+ }
+
+ GrpcConnection(Properties properties, BrokerSelector brokerSelector,
BrokerStreamingQueryClient grpcQueryClient) {
+ this(properties, brokerSelector, grpcQueryClient,
ConnectionUtils.getHeadersFromProperties(properties));
+ }
+
+ GrpcConnection(Properties properties, BrokerSelector brokerSelector,
BrokerStreamingQueryClient grpcQueryClient,
+ Map<String, String> defaultMetadata) {
_brokerSelector = brokerSelector;
- // Convert Properties properties to a Map
- Map<String, Object> propertiesMap = new HashMap<>();
- properties.forEach((key, value) -> propertiesMap.put(key.toString(),
value));
- _grpcQueryClient = new BrokerStreamingQueryClient(new GrpcConfig(new
PinotConfiguration(propertiesMap)));
+ _grpcQueryClient = grpcQueryClient;
+ _defaultMetadata = Map.copyOf(defaultMetadata);
// Default fail Pinot query if response contains any exception.
_failOnExceptions =
Boolean.parseBoolean(properties.getProperty(FAIL_ON_EXCEPTIONS, "TRUE"));
+ validateConnection();
+ }
+
+ private static BrokerStreamingQueryClient createGrpcQueryClient(Properties
properties) {
+ Map<String, Object> propertiesMap = new HashMap<>();
+ properties.forEach((key, value) -> propertiesMap.put(key.toString(),
value));
+ return new BrokerStreamingQueryClient(new GrpcConfig(new
PinotConfiguration(propertiesMap)));
+ }
+
+ private void validateConnection() {
+ try {
+ BrokerResponse brokerResponse =
+ BrokerResponse.fromJson(getJsonResponse(CONNECTION_VALIDATION_QUERY,
Collections.emptyMap()));
+ if (brokerResponse.hasExceptions()) {
+ throw new PinotClientException(
+ "Failed to establish gRPC broker connection: " +
brokerResponse.getExceptions());
+ }
+ } catch (PinotClientException e) {
+ closeQuietly();
+ throw e;
+ } catch (Exception e) {
+ closeQuietly();
+ throw new PinotClientException("Failed to establish gRPC broker
connection", e);
+ }
+ }
Review Comment:
The test only covers the happy path of connection validation. Consider
adding a test for the failure path (e.g., when the broker returns an exception
or is unreachable) to verify that `closeQuietly()` is invoked and a
`PinotClientException` is thrown. For instance, you could have
`RecordingBrokerStreamingQueryClient.submit()` throw a `RuntimeException` and
assert that the constructor propagates a `PinotClientException`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]