Skip to content

Commit

Permalink
remove DEFAULT_JOB_INSTANCE_ID
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Apr 7, 2017
1 parent e517384 commit c808c8e
Show file tree
Hide file tree
Showing 30 changed files with 225 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistry
* 初始化作业.
*/
public void init() {
schedulerFacade.registerStartUpInfo(liteJobConfig);
JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfig.getJobName(), liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount());
JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfig.getTypeConfig().getJobClass()), liteJobConfig.getJobName());
jobScheduleController.scheduleJob(liteJobConfig.getTypeConfig().getCoreConfig().getCron());
JobRegistry.getInstance().registerJob(liteJobConfig.getJobName(), jobScheduleController, regCenter);
schedulerFacade.registerStartUpInfo(liteJobConfig);
}

private JobDetail createJobDetail(final String jobClass) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@
@EqualsAndHashCode(of = "jobInstanceId")
public final class JobInstance {

/**
* 默认作业运行实例主键.
*/
public static final String DEFAULT_INSTANCE_ID = "1.1.1.1@-@1";

private static final String DELIMITER = "@-@";

/**
Expand All @@ -58,13 +53,4 @@ public JobInstance() {
public String getIp() {
return jobInstanceId.substring(0, jobInstanceId.indexOf(DELIMITER));
}

/**
* 判断是否为默认作业运行实例.
*
* @return 是否为默认作业运行实例
*/
public boolean isDefaultJobInstance() {
return DEFAULT_INSTANCE_ID.equals(jobInstanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class CronSettingAndJobEventChangedJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && null != JobRegistry.getInstance().getJobScheduleController(jobName)) {
if (configNode.isConfigPath(path) && Type.NODE_UPDATED == eventType && !JobRegistry.getInstance().isShutdown(jobName)) {
JobRegistry.getInstance().getJobScheduleController(jobName).rescheduleJob(LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig().getCoreConfig().getCron());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,43 @@ public ElectionListenerManager(final CoordinatorRegistryCenter regCenter, final
@Override
public void start() {
addDataListener(new LeaderElectionJobListener());
addDataListener(new LeaderAbdicationJobListener());
}

class LeaderElectionJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (isLeaderCrashed(path, eventType) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())
|| !leaderService.hasLeader() && isLocalServerEnabled(path, data)) {
if (!JobRegistry.getInstance().getJobInstance(jobName).isDefaultJobInstance()) {
leaderService.electLeader();
}
} else if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
leaderService.removeLeader();
if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
leaderService.electLeader();
}
}

private boolean isActiveElection(final String path, final String data) {
return !leaderService.hasLeader() && isLocalServerEnabled(path, data);
}

private boolean isPassiveElection(final String path, final Type eventType) {
return isLeaderCrashed(path, eventType) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp());
}

private boolean isLeaderCrashed(final String path, final Type eventType) {
return leaderNode.isLeaderInstancePath(path) && Type.NODE_REMOVED == eventType;
}

private boolean isLocalServerEnabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && !ServerStatus.DISABLED.name().equals(data);
}
}

class LeaderAbdicationJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
leaderService.removeLeader();
}
}

private boolean isLocalServerDisabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public boolean isLeaderUntilBlock() {
while (!hasLeader() && serverService.hasAvailableServers()) {
log.info("Leader is electing, waiting for {} ms", 100);
BlockUtils.waitingShortTime();
if (serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
if (!JobRegistry.getInstance().isShutdown(jobName) && serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
electLeader();
}
}
Expand All @@ -81,7 +81,7 @@ public boolean isLeaderUntilBlock() {
* @return 当前节点是否是主节点
*/
public boolean isLeader() {
return JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId().equals(jobNodeStorage.getJobNodeData(LeaderNode.INSTANCE));
return !JobRegistry.getInstance().isShutdown(jobName) && JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId().equals(jobNodeStorage.getJobNodeData(LeaderNode.INSTANCE));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.dangdang.ddframe.job.lite.internal.failover;

import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
import com.dangdang.ddframe.job.lite.internal.config.ConfigurationNode;
import com.dangdang.ddframe.job.lite.internal.config.ConfigurationService;
import com.dangdang.ddframe.job.lite.internal.config.LiteJobConfigurationGsonFactory;
Expand Down Expand Up @@ -78,7 +79,9 @@ private void failover(final Integer item, final Type eventType) {
}

private boolean isJobCrashAndNeedFailover(final Integer item, final Type eventType) {
return null != item && Type.NODE_REMOVED == eventType && !executionService.isCompleted(item) && configService.load(true).isFailover();
LiteJobConfiguration jobConfig = configService.load(true);
boolean isFailover = null != jobConfig && jobConfig.isFailover();
return null != item && Type.NODE_REMOVED == eventType && !executionService.isCompleted(item) && isFailover;
}

class JobCrashedJobListener extends AbstractJobListener {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.dangdang.ddframe.job.lite.internal.failover;

import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.lite.internal.schedule.JobScheduleController;
import com.dangdang.ddframe.job.lite.internal.sharding.ShardingNode;
import com.dangdang.ddframe.job.lite.internal.sharding.ShardingService;
import com.dangdang.ddframe.job.lite.internal.storage.JobNodeStorage;
Expand Down Expand Up @@ -96,7 +95,10 @@ public void updateFailoverComplete(final Collection<Integer> items) {
*
* @return 运行在本作业服务器的失效转移序列号
*/
public List<Integer> getLocalHostFailoverItems() {
public List<Integer> getLocalFailoverItems() {
if (JobRegistry.getInstance().isShutdown(jobName)) {
return Collections.emptyList();
}
List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
List<Integer> result = new ArrayList<>(items.size());
String ip = JobRegistry.getInstance().getJobInstance(jobName).getIp();
Expand Down Expand Up @@ -140,18 +142,15 @@ class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {

@Override
public void execute() {
if (!needFailover()) {
if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
return;
}
int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getIp());
jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
// TODO 不应使用triggerJob, 而是使用executor统一调度
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null != jobScheduleController) {
jobScheduleController.triggerJob();
}
JobRegistry.getInstance().getJobScheduleController(jobName).triggerJob();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.dangdang.ddframe.job.lite.internal.listener.AbstractJobListener;
import com.dangdang.ddframe.job.lite.internal.listener.AbstractListenerManager;
import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry;
import com.dangdang.ddframe.job.lite.internal.schedule.JobScheduleController;
import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;

Expand Down Expand Up @@ -57,13 +56,9 @@ protected void dataChanged(final String path, final Type eventType, final String
return;
}
instanceService.clearTriggerFlag();
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (null == jobScheduleController) {
return;
}
// TODO 目前是作业运行时不能触发, 未来改为堆积式触发
if (!JobRegistry.getInstance().isJobRunning(jobName)) {
jobScheduleController.triggerJob();
if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().isJobRunning(jobName)) {
// TODO 目前是作业运行时不能触发, 未来改为堆积式触发
JobRegistry.getInstance().getJobScheduleController(jobName).triggerJob();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public RegistryCenterConnectionStateListener(final CoordinatorRegistryCenter reg

@Override
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
if (JobRegistry.getInstance().isShutdown(jobName)) {
return;
}
JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
if (ConnectionState.LOST == newState) {
jobScheduleController.pauseJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ public void addJobInstance(final String jobName, final JobInstance jobInstance)
* @return 作业运行实例
*/
public JobInstance getJobInstance(final String jobName) {
JobInstance result = jobInstanceMap.get(jobName);
return null == result ? new JobInstance(JobInstance.DEFAULT_INSTANCE_ID) : result;
return jobInstanceMap.get(jobName);
}

/**
Expand Down Expand Up @@ -176,4 +175,14 @@ public void shutdown(final String jobName) {
jobRunningMap.remove(jobName);
currentShardingTotalCountMap.remove(jobName);
}

/**
* 判断任务调度是否已终止.
*
* @param jobName 作业名称
* @return 任务调度是否已终止
*/
public boolean isShutdown(final String jobName) {
return !schedulerMap.containsKey(jobName) || !jobInstanceMap.containsKey(jobName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void registerJobCompleted(final ShardingContexts shardingContexts) {
public ShardingContexts getShardingContexts() {
boolean isFailover = configService.load(true).isFailover();
if (isFailover) {
List<Integer> failoverShardingItems = failoverService.getLocalHostFailoverItems();
List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
if (!failoverShardingItems.isEmpty()) {
return executionContextService.getJobShardingContext(failoverShardingItems);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@ public void shutdownInstance() {
leaderService.removeLeader();
}
monitorService.close();
// TODO cannot mock, use power mock
// if (reconcileService.isRunning()) {
// reconcileService.stopAsync();
// }
if (reconcileService.isRunning()) {
reconcileService.stopAsync();
}
JobRegistry.getInstance().shutdown(jobName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ public final class ServerNode {

private static final String SERVERS = ROOT + "/%s";

private final String ip;
private final String jobName;

private final JobNodePath jobNodePath;

public ServerNode(final String jobName) {
ip = JobRegistry.getInstance().getJobInstance(jobName).getIp();
this.jobName = jobName;
jobNodePath = new JobNodePath(jobName);
}

Expand All @@ -63,7 +63,7 @@ public boolean isServerPath(final String path) {
* @return 是否为本地作业服务器路径
*/
public boolean isLocalServerPath(final String path) {
return path.equals(jobNodePath.getFullPath(String.format(SERVERS, ip)));
return path.equals(jobNodePath.getFullPath(String.format(SERVERS, JobRegistry.getInstance().getJobInstance(jobName).getIp())));
}

String getServerNode(final String ip) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public ServerService(final CoordinatorRegistryCenter regCenter, final String job
* @param enabled 作业是否启用
*/
public void persistOnline(final boolean enabled) {
jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());
if (!JobRegistry.getInstance().isShutdown(jobName)) {
jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class ListenServersChangedJobListener extends AbstractJobListener {

@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().getJobInstance(jobName).isDefaultJobInstance() && (isInstanceChange(eventType, path) || isServerChange(path))) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
shardingService.setReshardingFlag();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ private void resetShardingInfo(final int shardingTotalCount) {
* @return 运行在本作业实例的分片项集合
*/
public List<Integer> getLocalShardingItems() {
if (!serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
if (JobRegistry.getInstance().isShutdown(jobName) || !serverService.isAvailableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp())) {
return Collections.emptyList();
}
List<Integer> result = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,17 @@
import org.junit.Test;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

public final class JobInstanceTest {

@Test
public void assertGetJobInstanceId() {
assertThat(new JobInstance(JobInstance.DEFAULT_INSTANCE_ID).getJobInstanceId(), is("1.1.1.1@-@1"));
assertThat(new JobInstance("127.0.0.1@-@0").getJobInstanceId(), is("127.0.0.1@-@0"));
}

@Test
public void assertGetIp() {
assertThat(new JobInstance(JobInstance.DEFAULT_INSTANCE_ID).getIp(), is("1.1.1.1"));
assertThat(new JobInstance().getIp(), is(IpUtils.getIp()));
}

@Test
public void assertIsDefaultJobInstance() {
assertTrue(new JobInstance(JobInstance.DEFAULT_INSTANCE_ID).isDefaultJobInstance());
}

@Test
public void assertIsNotDefaultJobInstance() {
assertFalse(new JobInstance().isDefaultJobInstance());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.dangdang.ddframe.job.lite.internal.config;

import com.dangdang.ddframe.job.event.JobEventBus;
import com.dangdang.ddframe.job.lite.api.strategy.JobInstance;
import com.dangdang.ddframe.job.lite.fixture.LiteJsonConstants;
import com.dangdang.ddframe.job.lite.internal.config.RescheduleListenerManager.CronSettingAndJobEventChangedJobListener;
import com.dangdang.ddframe.job.lite.internal.schedule.JobRegistry;
Expand Down Expand Up @@ -83,8 +84,10 @@ public void assertCronSettingChangedJobListenerWhenIsCronPathAndUpdateButCannotF

@Test
public void assertCronSettingChangedJobListenerWhenIsCronPathAndUpdateAndFindJob() {
JobRegistry.getInstance().addJobInstance("test_job", new JobInstance("127.0.0.1@-@0"));
JobRegistry.getInstance().registerJob("test_job", jobScheduleController, regCenter);
rescheduleListenerManager.new CronSettingAndJobEventChangedJobListener().dataChanged("/test_job/config", Type.NODE_UPDATED, LiteJsonConstants.getJobJson());
verify(jobScheduleController).rescheduleJob("0/1 * * * * ?");
JobRegistry.getInstance().shutdown("test_job");
}
}
Loading

0 comments on commit c808c8e

Please sign in to comment.