Hisoka-X commented on code in PR #9776:
URL: https://github.com/apache/seatunnel/pull/9776#discussion_r2306487215
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java:
##########
@@ -893,47 +892,69 @@ public void removeMetricsContext(
if ((pipelineStatus.equals(PipelineStatus.FINISHED)
&&
!checkpointManager.isPipelineSavePointEnd(pipelineLocation))
|| pipelineStatus.equals(PipelineStatus.CANCELED)) {
+ final long deadlineNanos = System.nanoTime() +
TimeUnit.MINUTES.toNanos(2);
+ long backoffMillis = 1000;
+ final long maxBackoffMillis = 10000;
+ int attempts = 0;
+ while (isRunning) {
+
+ InvocationFuture<Object> invoke =
+ nodeEngine
+ .getOperationService()
+ .createInvocationBuilder(
+ SeaTunnelServer.SERVICE_NAME,
+ new
RemoveMetricsOperation(pipelineLocation),
+ nodeEngine.getMasterAddress())
+ .invoke();
Review Comment:
Jobmaster always on master node. So I think open a operation is useless. We
can directly call method.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -571,16 +571,81 @@ public void provideDynamicMetrics(
}
private void updateMetricsContextInImap() {
- if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
+ final long deadlineNanos = System.nanoTime() +
TimeUnit.MINUTES.toNanos(2);
+ long backoffMillis = 1000;
+ final long maxBackoffMillis = 10000;
+ int attempts = 0;
+ while (isRunning) {
+ if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
+ logger.warning(
+ String.format(
+ "The Node is not ready yet, Node state
%s,looking forward to the next "
+ + "scheduling",
+ nodeEngine.getNode().getState()));
+ return;
+ }
+
+ InvocationFuture<Object> invoke =
+ nodeEngine
+ .getOperationService()
+ .createInvocationBuilder(
+ SeaTunnelServer.SERVICE_NAME,
+ new
ReportMetricsOperation(collectLocalMetricsMap()),
+ nodeEngine.getMasterAddress())
+ .invoke();
+
+ try {
+ invoke.get();
+ return;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.severe("update metrics context stopped due to thread
interruption.", e);
+ return;
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof JobNotFoundException) {
+ logger.warning(
+ "update metrics context in imap failed because
can't find job", e);
+ return;
+ }
+ if (HazelcastRetryUtils.isRetryable(cause)) {
+ logger.warning(ExceptionUtils.getMessage(e), e);
+ } else {
+ logger.severe("non-retryable failure while updating
metrics", e);
+ return;
+ }
Review Comment:
we should not handle this, the hazelcast will handle this situation.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -336,6 +343,53 @@ private void printExecutionInfo() {
}
}
+ public synchronized void updateMetrics(Map<TaskLocation,
SeaTunnelMetricsContext> localMap) {
+ if (localMap == null || localMap.isEmpty()) {
+ return;
+ }
+ IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap
=
+
getNodeEngine().getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+
+ HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
+ metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+
+ if (centralMap == null) {
+ centralMap = new HashMap<>();
+ }
+ centralMap.putAll(localMap);
+ metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap);
+ }
+
+ public synchronized void removeMetrics(PipelineLocation pipelineLocation) {
+ IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> metricsImap
=
+
getNodeEngine().getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
+
+ HashMap<TaskLocation, SeaTunnelMetricsContext> centralMap =
+ metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY);
+ if (centralMap == null) {
+ return;
+ }
+
+ List<TaskLocation> taskLocations =
+ getTaskLocations(
+ pipelineLocation,
metricsImap.get(Constant.IMAP_RUNNING_JOB_METRICS_KEY));
Review Comment:
```suggestion
pipelineLocation, centralMap);
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -571,16 +571,81 @@ public void provideDynamicMetrics(
}
private void updateMetricsContextInImap() {
- if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
+ final long deadlineNanos = System.nanoTime() +
TimeUnit.MINUTES.toNanos(2);
+ long backoffMillis = 1000;
+ final long maxBackoffMillis = 10000;
+ int attempts = 0;
+ while (isRunning) {
+ if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
+ logger.warning(
+ String.format(
+ "The Node is not ready yet, Node state
%s,looking forward to the next "
+ + "scheduling",
+ nodeEngine.getNode().getState()));
+ return;
+ }
+
+ InvocationFuture<Object> invoke =
+ nodeEngine
+ .getOperationService()
+ .createInvocationBuilder(
+ SeaTunnelServer.SERVICE_NAME,
+ new
ReportMetricsOperation(collectLocalMetricsMap()),
Review Comment:
The `collectLocalMetricsMap` should only execute once.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -571,16 +571,81 @@ public void provideDynamicMetrics(
}
private void updateMetricsContextInImap() {
- if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
+ final long deadlineNanos = System.nanoTime() +
TimeUnit.MINUTES.toNanos(2);
+ long backoffMillis = 1000;
+ final long maxBackoffMillis = 10000;
+ int attempts = 0;
+ while (isRunning) {
+ if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
+ logger.warning(
+ String.format(
+ "The Node is not ready yet, Node state
%s,looking forward to the next "
+ + "scheduling",
+ nodeEngine.getNode().getState()));
+ return;
+ }
+
+ InvocationFuture<Object> invoke =
+ nodeEngine
+ .getOperationService()
+ .createInvocationBuilder(
+ SeaTunnelServer.SERVICE_NAME,
+ new
ReportMetricsOperation(collectLocalMetricsMap()),
+ nodeEngine.getMasterAddress())
+ .invoke();
+
+ try {
+ invoke.get();
+ return;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.severe("update metrics context stopped due to thread
interruption.", e);
+ return;
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof JobNotFoundException) {
Review Comment:
Does this really would happen?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -571,16 +571,81 @@ public void provideDynamicMetrics(
}
private void updateMetricsContextInImap() {
- if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) {
+ final long deadlineNanos = System.nanoTime() +
TimeUnit.MINUTES.toNanos(2);
+ long backoffMillis = 1000;
+ final long maxBackoffMillis = 10000;
+ int attempts = 0;
+ while (isRunning) {
Review Comment:
we don't need handle with retry. Because `updateMetricsContextInImap` will
be invoked by scheduler
https://github.com/apache/seatunnel/blob/0ec6997a3e287d92e7a058bc3033517a9f479ab8/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java#L168
##########
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java:
##########
@@ -110,20 +105,6 @@ private static void runJobFileWithAssertEndStatus(
}
}
- @Test
- public void testExecuteJobWithLockMetrics() throws Exception {
Review Comment:
why removed this test case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]