This is an automated email from the ASF dual-hosted git repository.
wuweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 19d158010 Adjust JobScheduler init sequence to avoid use wrong
ElasticJobListener (#2187)
19d158010 is described below
commit 19d1580102dba8498105737bb08bdb1c604e3970
Author: wizhuo <[email protected]>
AuthorDate: Tue Feb 28 09:31:05 2023 +0800
Adjust JobScheduler init sequence to avoid use wrong ElasticJobListener
(#2187)
* Adjust JobScheduler init sequence to avoid use wrong ElasticJobListener
when the config overwrite is false
* adjust code style
* Remove unnecessary methods
---
.../lite/internal/schedule/JobScheduler.java | 30 +++++++++++++---------
.../lite/internal/setup/SetUpFacade.java | 16 ------------
.../lite/internal/setup/SetUpFacadeTest.java | 20 +--------------
3 files changed, 19 insertions(+), 47 deletions(-)
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
index 4629b3889..ffee2d7d8 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/schedule/JobScheduler.java
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import
org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListenerFactory;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import
org.apache.shardingsphere.elasticjob.lite.api.listener.AbstractDistributeOnceElasticJobListener;
+import
org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import
org.apache.shardingsphere.elasticjob.lite.internal.guarantee.GuaranteeService;
import
org.apache.shardingsphere.elasticjob.lite.internal.setup.JobClassNameProviderFactory;
import org.apache.shardingsphere.elasticjob.lite.internal.setup.SetUpFacade;
@@ -78,32 +79,37 @@ public final class JobScheduler {
public JobScheduler(final CoordinatorRegistryCenter regCenter, final
ElasticJob elasticJob, final JobConfiguration jobConfig) {
Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be
null.");
this.regCenter = regCenter;
- Collection<ElasticJobListener> jobListeners =
getElasticJobListeners(jobConfig);
- setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(),
jobListeners);
String jobClassName =
JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
- this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName,
jobConfig);
- schedulerFacade = new SchedulerFacade(regCenter,
jobConfig.getJobName());
- jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(),
jobListeners, findTracingConfiguration().orElse(null));
+ this.jobConfig = setUpJobConfiguration(regCenter, jobClassName,
jobConfig);
+ Collection<ElasticJobListener> jobListeners =
getElasticJobListeners(this.jobConfig);
+ setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(),
jobListeners);
+ schedulerFacade = new SchedulerFacade(regCenter,
this.jobConfig.getJobName());
+ jobFacade = new LiteJobFacade(regCenter, this.jobConfig.getJobName(),
jobListeners, findTracingConfiguration().orElse(null));
validateJobProperties();
jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig,
jobFacade);
setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
jobScheduleController = createJobScheduleController();
}
-
+
public JobScheduler(final CoordinatorRegistryCenter regCenter, final
String elasticJobType, final JobConfiguration jobConfig) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(elasticJobType),
"Elastic job type cannot be null or empty.");
this.regCenter = regCenter;
- Collection<ElasticJobListener> jobListeners =
getElasticJobListeners(jobConfig);
- setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(),
jobListeners);
- this.jobConfig = setUpFacade.setUpJobConfiguration(elasticJobType,
jobConfig);
- schedulerFacade = new SchedulerFacade(regCenter,
jobConfig.getJobName());
- jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(),
jobListeners, findTracingConfiguration().orElse(null));
+ this.jobConfig = setUpJobConfiguration(regCenter, elasticJobType,
jobConfig);
+ Collection<ElasticJobListener> jobListeners =
getElasticJobListeners(this.jobConfig);
+ setUpFacade = new SetUpFacade(regCenter, this.jobConfig.getJobName(),
jobListeners);
+ schedulerFacade = new SchedulerFacade(regCenter,
this.jobConfig.getJobName());
+ jobFacade = new LiteJobFacade(regCenter, this.jobConfig.getJobName(),
jobListeners, findTracingConfiguration().orElse(null));
validateJobProperties();
jobExecutor = new ElasticJobExecutor(elasticJobType, this.jobConfig,
jobFacade);
setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
jobScheduleController = createJobScheduleController();
}
-
+
+ private JobConfiguration setUpJobConfiguration(final
CoordinatorRegistryCenter regCenter, final String jobClassName, final
JobConfiguration jobConfig) {
+ ConfigurationService configService = new
ConfigurationService(regCenter, jobConfig.getJobName());
+ return configService.setUpJobConfiguration(jobClassName, jobConfig);
+ }
+
private Collection<ElasticJobListener> getElasticJobListeners(final
JobConfiguration jobConfig) {
return jobConfig.getJobListenerTypes().stream()
.map(type ->
ElasticJobListenerFactory.createListener(type).orElseThrow(() -> new
IllegalArgumentException(String.format("Can not find job listener type '%s'.",
type))))
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
index 05ed51ff1..0479be285 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/main/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacade.java
@@ -17,9 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.setup;
-import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
-import
org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import
org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
import
org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
import
org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManager;
@@ -34,8 +32,6 @@ import java.util.Collection;
*/
public final class SetUpFacade {
- private final ConfigurationService configService;
-
private final LeaderService leaderService;
private final ServerService serverService;
@@ -47,7 +43,6 @@ public final class SetUpFacade {
private final ListenerManager listenerManager;
public SetUpFacade(final CoordinatorRegistryCenter regCenter, final String
jobName, final Collection<ElasticJobListener> elasticJobListeners) {
- configService = new ConfigurationService(regCenter, jobName);
leaderService = new LeaderService(regCenter, jobName);
serverService = new ServerService(regCenter, jobName);
instanceService = new InstanceService(regCenter, jobName);
@@ -55,17 +50,6 @@ public final class SetUpFacade {
listenerManager = new ListenerManager(regCenter, jobName,
elasticJobListeners);
}
- /**
- * Set up job configuration.
- *
- * @param jobClassName job class name
- * @param jobConfig job configuration to be updated
- * @return accepted job configuration
- */
- public JobConfiguration setUpJobConfiguration(final String jobClassName,
final JobConfiguration jobConfig) {
- return configService.setUpJobConfiguration(jobClassName, jobConfig);
- }
-
/**
* Register start up info.
*
diff --git
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
index 846a1c4e3..5cd521d25 100644
---
a/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
+++
b/elasticjob-lite/elasticjob-lite-core/src/test/java/org/apache/shardingsphere/elasticjob/lite/internal/setup/SetUpFacadeTest.java
@@ -17,10 +17,7 @@
package org.apache.shardingsphere.elasticjob.lite.internal.setup;
-import org.apache.shardingsphere.elasticjob.api.ElasticJob;
-import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
-import
org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import
org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
import
org.apache.shardingsphere.elasticjob.lite.internal.instance.InstanceService;
import
org.apache.shardingsphere.elasticjob.lite.internal.listener.ListenerManager;
@@ -36,17 +33,12 @@ import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class SetUpFacadeTest {
- @Mock
- private ConfigurationService configService;
-
@Mock
private LeaderService leaderService;
@@ -68,23 +60,13 @@ public final class SetUpFacadeTest {
public void setUp() {
JobRegistry.getInstance().addJobInstance("test_job", new
JobInstance("127.0.0.1@-@0"));
setUpFacade = new SetUpFacade(null, "test_job",
Collections.emptyList());
- ReflectionUtils.setFieldValue(setUpFacade, "configService",
configService);
ReflectionUtils.setFieldValue(setUpFacade, "leaderService",
leaderService);
ReflectionUtils.setFieldValue(setUpFacade, "serverService",
serverService);
ReflectionUtils.setFieldValue(setUpFacade, "instanceService",
instanceService);
ReflectionUtils.setFieldValue(setUpFacade, "reconcileService",
reconcileService);
ReflectionUtils.setFieldValue(setUpFacade, "listenerManager",
listenerManager);
}
-
- @Test
- public void assertSetUpJobConfiguration() {
- JobConfiguration jobConfig = JobConfiguration.newBuilder("test_job", 3)
- .cron("0/1 * * * * ?").setProperty("streaming.process",
Boolean.TRUE.toString()).build();
- when(configService.setUpJobConfiguration(ElasticJob.class.getName(),
jobConfig)).thenReturn(jobConfig);
-
assertThat(setUpFacade.setUpJobConfiguration(ElasticJob.class.getName(),
jobConfig), is(jobConfig));
-
verify(configService).setUpJobConfiguration(ElasticJob.class.getName(),
jobConfig);
- }
-
+
@Test
public void assertRegisterStartUpInfo() {
setUpFacade.registerStartUpInfo(true);