Copilot commented on code in PR #17587:
URL: https://github.com/apache/pinot/pull/17587#discussion_r2848682916
##########
pinot-tools/src/main/java/org/apache/pinot/tools/QuickStartBase.java:
##########
@@ -1023,4 +1042,67 @@ private static String firstNonEmpty(String... values) {
}
return null;
}
+
+ protected void createFineFoodReviewsFederatedTable() {
+ if (!useDefaultBootstrapTableDir()) {
+ return;
+ }
+ Map<String, String> streamTableDirectories =
getDefaultStreamTableDirectories();
+ if (!streamTableDirectories.containsKey("fineFoodReviews-part-0")
+ || !streamTableDirectories.containsKey("fineFoodReviews-part-1")) {
+ return;
+ }
+ String logicalTableName = "fineFoodReviews-federated";
+ try {
+ Schema schema =
loadSchemaFromResource("/examples/stream/fineFoodReviews/fineFoodReviews_schema.json");
+ schema.setSchemaName(logicalTableName);
+ createSchemaOnController(schema, logicalTableName);
+
+ LogicalTableConfig logicalTableConfig = new LogicalTableConfig();
+ logicalTableConfig.setTableName(logicalTableName);
+ logicalTableConfig.setBrokerTenant("DefaultTenant");
+
logicalTableConfig.setRefRealtimeTableName("fineFoodReviews-part-0_REALTIME");
+ logicalTableConfig.setPhysicalTableConfigMap(Map.of(
+ "fineFoodReviews-part-0_REALTIME", new PhysicalTableConfig(),
+ "fineFoodReviews-part-1_REALTIME", new PhysicalTableConfig()
+ ));
+
+ String logicalTableUrl = "http://localhost:" +
QuickstartRunner.DEFAULT_CONTROLLER_PORT + "/logicalTables";
+ AbstractBaseAdminCommand.sendPostRequest(logicalTableUrl,
logicalTableConfig.toSingleLineJsonString());
+ printStatus(Quickstart.Color.GREEN,
+ "***** Logical table fineFoodReviews-federated created successfully
*****");
+ } catch (Exception e) {
Review Comment:
This `catch (Exception)` only prints `e.getMessage()` and then continues,
which makes quickstart failures hard to debug (stack trace and root cause are
lost). Consider logging the full exception (or at least `e` + stack trace) via
the existing logger so failures can be diagnosed.
```suggestion
} catch (Exception e) {
LOGGER.warn("Logical table {} creation failed: {}", logicalTableName,
e.getMessage());
LOGGER.debug("Exception while creating logical table {}",
logicalTableName, e);
```
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaPartitionLevelStreamConfig.java:
##########
@@ -120,6 +120,10 @@ public boolean isPopulateMetadata() {
return _populateMetadata;
}
+ public Map<String, String> getStreamConfigMap() {
+ return _streamConfigMap;
+ }
Review Comment:
`getStreamConfigMap()` exposes the internal `_streamConfigMap` by reference,
so callers could mutate it and implicitly change consumer behavior after
construction. Consider returning an unmodifiable view (or a defensive copy) to
keep the stream config immutable once parsed.
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java:
##########
@@ -344,6 +408,25 @@ private List<PartitionInfo> fetchPartitionInfos(long
timeoutMillis) {
new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic)));
}
+ private void validatePartitionIds(List<Integer> subset) {
+ List<Integer> topicPartitionIds = new ArrayList<>();
+ List<PartitionInfo> partitionInfos = fetchPartitionInfos(10_000L);
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ topicPartitionIds.add(partitionInfo.partition());
+ }
+ Collections.sort(topicPartitionIds);
+ List<Integer> missingPartitionIds = new ArrayList<>();
+ for (Integer partitionId : subset) {
+ if (!topicPartitionIds.contains(partitionId)) {
Review Comment:
`validatePartitionIds()` does `topicPartitionIds.contains(partitionId)`
inside a loop, making validation O(n*m) for large topics. Consider building a
`Set` for membership checks (while keeping the sorted list only for the error
message) to avoid quadratic behavior.
```suggestion
Set<Integer> topicPartitionIdSet = new HashSet<>(topicPartitionIds);
List<Integer> missingPartitionIds = new ArrayList<>();
for (Integer partitionId : subset) {
if (!topicPartitionIdSet.contains(partitionId)) {
```
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -346,6 +410,25 @@ private List<PartitionInfo> fetchPartitionInfos(long
timeoutMillis) {
new RuntimeException(String.format("Failed to fetch partition
information for topic: %s", _topic)));
}
+ private void validatePartitionIds(List<Integer> subset) {
+ List<Integer> topicPartitionIds = new ArrayList<>();
+ List<PartitionInfo> partitionInfos = fetchPartitionInfos(10_000L);
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ topicPartitionIds.add(partitionInfo.partition());
+ }
+ Collections.sort(topicPartitionIds);
+ List<Integer> missingPartitionIds = new ArrayList<>();
+ for (Integer partitionId : subset) {
+ if (!topicPartitionIds.contains(partitionId)) {
Review Comment:
`validatePartitionIds()` does `topicPartitionIds.contains(partitionId)`
inside a loop, making validation O(n*m) for large topics. Consider building a
`Set` for membership checks (while keeping the sorted list only for the error
message) to avoid quadratic behavior.
```suggestion
Set<Integer> topicPartitionIdSet = new HashSet<>(topicPartitionIds);
List<Integer> missingPartitionIds = new ArrayList<>();
for (Integer partitionId : subset) {
if (!topicPartitionIdSet.contains(partitionId)) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]