ZanderXu commented on code in PR #6833:
URL: https://github.com/apache/hadoop/pull/6833#discussion_r1607499422
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java:
##########
@@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record)
throws IOException {
Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
return remove(recordClass, query) == 1;
}
+
+ @Override
+ public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T>
records) throws IOException {
+ assert !records.isEmpty();
+ // Fall back to iterative remove() calls if all records don't share 1 class
+ Class<? extends BaseRecord> expectedClazz = records.get(0).getClass();
+ if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) {
+ Map<T, Boolean> result = new HashMap<>();
+ for (T record : records) {
+ result.put(record, remove(record));
+ }
+ return result;
+ }
+
+ final List<Query<T>> queries = new ArrayList<>();
+ for (T record: records) {
+ queries.add(new Query<>(record));
+ }
+ @SuppressWarnings("unchecked")
+ Class<T> recordClass = (Class<T>)
StateStoreUtils.getRecordClass(expectedClazz);
+ Map<Query<T>, Integer> result = remove(recordClass, queries);
+ return result.entrySet().stream()
+ .collect(Collectors.toMap(e -> e.getKey().getPartial(), e ->
e.getValue() > 0));
Review Comment:
`remove(T record)` returns true if `remove(recordClass, query)` is 1. But
here is `e.getValue() > 0`. So how about make them consistent?
Here, how about using `e.getValue() == 1`?
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMembershipState.java:
##########
@@ -565,7 +568,7 @@ public void testRegistrationExpiredRaceCondition()
// Load cache
MembershipStore memStoreSpy = spy(membershipStore);
DelayAnswer delayer = new DelayAnswer(LOG);
- doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any());
+ doAnswer(delayer).when(memStoreSpy).overrideExpiredRecords(any(),
anyBoolean());
Review Comment:
remove this `anyBoolean()`
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult
putAll(
}
@Override
- public <T extends BaseRecord> int remove(
- Class<T> clazz, Query<T> query) throws IOException {
+ public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
+ List<Query<T>> queries) throws IOException {
verifyDriverReady();
- if (query == null) {
- return 0;
+ // Track how many entries are deleted by each query
+ Map<Query<T>, Integer> ret = new HashMap<>();
+ final List<T> trueRemoved = Collections.synchronizedList(new
ArrayList<>());
+ if (queries.isEmpty()) {
+ return ret;
}
// Read the current data
long start = monotonicNow();
- List<T> records = null;
+ List<T> records;
try {
QueryResult<T> result = get(clazz);
records = result.getRecords();
} catch (IOException ex) {
LOG.error("Cannot get existing records", ex);
getMetrics().addFailure(monotonicNow() - start);
- return 0;
+ return ret;
}
// Check the records to remove
String znode = getZNodeForClass(clazz);
- List<T> recordsToRemove = filterMultiple(query, records);
+ Set<T> recordsToRemove = new HashSet<>();
+ Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
+ for (Query<T> query : queries) {
+ List<T> filtered = filterMultiple(query, records);
+ queryToRecords.put(query, filtered);
+ recordsToRemove.addAll(filtered);
+ }
// Remove the records
- int removed = 0;
- for (T existingRecord : recordsToRemove) {
+ List<Callable<Void>> callables = new ArrayList<>();
+ recordsToRemove.forEach(existingRecord -> callables.add(() -> {
LOG.info("Removing \"{}\"", existingRecord);
try {
String primaryKey = getPrimaryKey(existingRecord);
String path = getNodePath(znode, primaryKey);
if (zkManager.delete(path)) {
- removed++;
+ trueRemoved.add(existingRecord);
} else {
LOG.error("Did not remove \"{}\"", existingRecord);
}
} catch (Exception e) {
LOG.error("Cannot remove \"{}\"", existingRecord, e);
getMetrics().addFailure(monotonicNow() - start);
}
+ return null;
+ }));
+ try {
+ if (enableConcurrent) {
+ executorService.invokeAll(callables);
+ } else {
+ for (Callable<Void> callable : callables) {
+ callable.call();
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Record removal failed : {}", e.getMessage(), e);
+ throw new IOException(e);
Review Comment:
`throw new IOException(e);` should be removed since `remove(Class<T> clazz,
Query<T> query)` does not throw any exceptions except
`StateStoreUnavailableException` in `verifyDriverReady`.
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java:
##########
@@ -284,51 +288,88 @@ public <T extends BaseRecord> StateStoreOperationResult
putAll(
}
@Override
- public <T extends BaseRecord> int remove(
- Class<T> clazz, Query<T> query) throws IOException {
+ public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz,
+ List<Query<T>> queries) throws IOException {
verifyDriverReady();
- if (query == null) {
- return 0;
+ // Track how many entries are deleted by each query
+ Map<Query<T>, Integer> ret = new HashMap<>();
+ final List<T> trueRemoved = Collections.synchronizedList(new
ArrayList<>());
+ if (queries.isEmpty()) {
+ return ret;
}
// Read the current data
long start = monotonicNow();
- List<T> records = null;
+ List<T> records;
try {
QueryResult<T> result = get(clazz);
records = result.getRecords();
} catch (IOException ex) {
LOG.error("Cannot get existing records", ex);
getMetrics().addFailure(monotonicNow() - start);
- return 0;
+ return ret;
}
// Check the records to remove
String znode = getZNodeForClass(clazz);
- List<T> recordsToRemove = filterMultiple(query, records);
+ Set<T> recordsToRemove = new HashSet<>();
+ Map<Query<T>, List<T>> queryToRecords = new HashMap<>();
+ for (Query<T> query : queries) {
+ List<T> filtered = filterMultiple(query, records);
+ queryToRecords.put(query, filtered);
+ recordsToRemove.addAll(filtered);
+ }
// Remove the records
- int removed = 0;
- for (T existingRecord : recordsToRemove) {
+ List<Callable<Void>> callables = new ArrayList<>();
+ recordsToRemove.forEach(existingRecord -> callables.add(() -> {
LOG.info("Removing \"{}\"", existingRecord);
try {
String primaryKey = getPrimaryKey(existingRecord);
String path = getNodePath(znode, primaryKey);
if (zkManager.delete(path)) {
- removed++;
+ trueRemoved.add(existingRecord);
} else {
LOG.error("Did not remove \"{}\"", existingRecord);
}
} catch (Exception e) {
LOG.error("Cannot remove \"{}\"", existingRecord, e);
getMetrics().addFailure(monotonicNow() - start);
}
+ return null;
+ }));
+ try {
+ if (enableConcurrent) {
+ executorService.invokeAll(callables);
+ } else {
+ for (Callable<Void> callable : callables) {
+ callable.call();
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Record removal failed : {}", e.getMessage(), e);
+ throw new IOException(e);
}
long end = monotonicNow();
- if (removed > 0) {
+ if (!trueRemoved.isEmpty()) {
getMetrics().addRemove(end - start);
}
- return removed;
+ // Generate return map
+ for (Map.Entry<Query<T>, List<T>> entry : queryToRecords.entrySet()) {
+ for (T record : entry.getValue()) {
+ if (trueRemoved.contains(record)) {
+ ret.compute(entry.getKey(), (k, v) -> (v == null) ? 1 : v + 1);
+ break;
Review Comment:
This `break` should be removed?
##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreBaseImpl.java:
##########
@@ -86,4 +89,37 @@ public <T extends BaseRecord> boolean remove(T record)
throws IOException {
Class<T> recordClass = (Class<T>)StateStoreUtils.getRecordClass(clazz);
return remove(recordClass, query) == 1;
}
+
+ @Override
+ public <T extends BaseRecord> Map<T, Boolean> removeMultiple(List<T>
records) throws IOException {
+ assert !records.isEmpty();
+ // Fall back to iterative remove() calls if all records don't share 1 class
+ Class<? extends BaseRecord> expectedClazz = records.get(0).getClass();
+ if (!records.stream().allMatch(x -> x.getClass() == expectedClazz)) {
+ Map<T, Boolean> result = new HashMap<>();
+ for (T record : records) {
+ result.put(record, remove(record));
+ }
+ return result;
+ }
+
+ final List<Query<T>> queries = new ArrayList<>();
+ for (T record: records) {
Review Comment:
`for (T record : records) {`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]