milenkovicm commented on code in PR #1547:
URL:
https://github.com/apache/datafusion-ballista/pull/1547#discussion_r3060423687
##########
ballista/executor/src/executor_server.rs:
##########
@@ -476,12 +497,44 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
};
}
- // TODO populate with real metrics
+ // Getting system-wide executor metrics
fn get_executor_metrics(&self) -> Vec<ExecutorMetric> {
+ let mut executor_system = self.executor_system.lock().unwrap();
+ let should_refresh = {
+ if executor_system.last_refresh.elapsed() >=
Duration::from_millis(100) {
+ executor_system.last_refresh = Instant::now();
+ true
+ } else {
+ false
+ }
+ };
+
+ if should_refresh {
+ executor_system.system.refresh_all();
+ }
+
+ let mut executor_metrics = Vec::new();
+
+ let total_memory = ExecutorMetric {
+ metric: Some(executor_metric::Metric::TotalMemory(
+ executor_system.system.total_memory(),
+ )),
+ };
+ executor_metrics.push(total_memory);
+
let available_memory = ExecutorMetric {
- metric: Some(executor_metric::Metric::AvailableMemory(u64::MAX)),
+ metric: Some(executor_metric::Metric::AvailableMemory(
+ executor_system.system.available_memory(),
+ )),
+ };
+ executor_metrics.push(available_memory);
+
+ let used_memory = ExecutorMetric {
+ metric: Some(executor_metric::Metric::UsedMemory(
+ executor_system.system.used_memory(),
+ )),
};
- let executor_metrics = vec![available_memory];
+ executor_metrics.push(used_memory);
Review Comment:
can we cache all this and replace it only if memory has been refreshed
##########
ballista/executor/src/executor_server.rs:
##########
@@ -476,12 +497,44 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
};
}
- // TODO populate with real metrics
+ // Getting system-wide executor metrics
fn get_executor_metrics(&self) -> Vec<ExecutorMetric> {
+ let mut executor_system = self.executor_system.lock().unwrap();
+ let should_refresh = {
+ if executor_system.last_refresh.elapsed() >=
Duration::from_millis(100) {
Review Comment:
is there a way to configure this ?
##########
ballista/executor/src/executor_server.rs:
##########
@@ -476,12 +497,44 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
};
}
- // TODO populate with real metrics
+ // Getting system-wide executor metrics
fn get_executor_metrics(&self) -> Vec<ExecutorMetric> {
+ let mut executor_system = self.executor_system.lock().unwrap();
+ let should_refresh = {
+ if executor_system.last_refresh.elapsed() >=
Duration::from_millis(100) {
+ executor_system.last_refresh = Instant::now();
+ true
+ } else {
+ false
+ }
+ };
+
+ if should_refresh {
+ executor_system.system.refresh_all();
Review Comment:
should we use refresh_memory_specifics ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]