freemandealer commented on code in PR #62384:
URL: https://github.com/apache/doris/pull/62384#discussion_r3339852330
##########
fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java:
##########
@@ -148,10 +154,185 @@ public String toString() {
}
}
+ private static class OncePendingJobKey {
+ private final JobType jobType;
+ private final String srcName;
+ private final String dstName;
+ private final List<String> normalizedTables;
+ private final boolean force;
+
+ OncePendingJobKey(JobType jobType, String srcName, String dstName,
+ List<String> normalizedTables, boolean force) {
+ this.jobType = jobType;
+ this.srcName = normalizeNullableName(srcName);
+ this.dstName = normalizeNullableName(dstName);
+ this.normalizedTables = normalizedTables.isEmpty()
+ ? Collections.emptyList()
+ : Collections.unmodifiableList(new
ArrayList<>(normalizedTables));
+ this.force = force;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof OncePendingJobKey)) {
+ return false;
+ }
+ OncePendingJobKey jobKey = (OncePendingJobKey) o;
+ return force == jobKey.force
+ && jobType == jobKey.jobType
+ && Objects.equals(srcName, jobKey.srcName)
+ && Objects.equals(dstName, jobKey.dstName)
+ && Objects.equals(normalizedTables,
jobKey.normalizedTables);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobType, srcName, dstName, normalizedTables,
force);
+ }
+
+ @Override
+ public String toString() {
+ return "OncePendingWarmUpJob{"
+ + "jobType=" + jobType
+ + ", src='" + srcName + '\''
+ + ", dst='" + dstName + '\''
+ + ", tables=" + normalizedTables
+ + ", force=" + force
+ + '}';
+ }
+ }
+
+ private static class RefCountedPendingCreateLock {
+ private final ReentrantLock lock = new ReentrantLock();
+
+ // Tracks holders and waiters that retained the entry before locking.
+ private volatile int refCount = 1;
+
+ void retain() {
+ ++refCount;
+ }
+
+ int release() {
+ Preconditions.checkState(refCount > 0, "once pending create lock
ref count underflow");
+ return --refCount;
+ }
+
+ void lock() {
+ lock.lock();
+ }
+
+ void unlock() {
+ lock.unlock();
+ }
+ }
+
// Tracks long-running jobs (event-driven and periodic).
// Ensures only one active job exists per <source, destination, sync_mode>
tuple.
private Set<JobKey> repeatJobDetectionSet = ConcurrentHashMap.newKeySet();
+ private static String normalizeNullableName(String value) {
+ return value == null ? "" : value;
+ }
+
+ private static String normalizeTableKey(Triple<String, String, String>
tableTriple) {
+ String dbName = normalizeNullableName(tableTriple.getLeft());
+ String tableName = normalizeNullableName(tableTriple.getMiddle());
+ String partitionName = normalizeNullableName(tableTriple.getRight());
+ if (partitionName.isEmpty()) {
+ return dbName + "." + tableName;
+ }
+ return dbName + "." + tableName + "." + partitionName;
+ }
+
+ private static List<String> normalizeTables(List<Triple<String, String,
String>> tables) {
+ if (tables == null || tables.isEmpty()) {
+ return Collections.emptyList();
+ }
+ HashSet<String> normalizedTables = new HashSet<>();
+ for (Triple<String, String, String> table : tables) {
+ normalizedTables.add(normalizeTableKey(table));
+ }
+ List<String> sortedTables = new ArrayList<>(normalizedTables);
+ Collections.sort(sortedTables);
+ return sortedTables;
+ }
+
+ private boolean isClusterOnceCommand(WarmUpClusterCommand command) {
+ Map<String, String> properties = command.getProperties();
+ if (properties == null) {
+ return true;
+ }
+ String syncMode = properties.get("sync_mode");
+ return !"periodic".equals(syncMode) &&
!"event_driven".equals(syncMode);
+ }
+
+ private OncePendingJobKey buildOncePendingJobKey(WarmUpClusterCommand
command) {
+ if (command.isWarmUpWithTable()) {
+ return new OncePendingJobKey(JobType.TABLE, "",
command.getDstCluster(),
+ normalizeTables(command.getTables()), command.isForce());
+ }
Review Comment:
reasonable, will inform users with that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]