7.实现任务的rebalance

发布时间:2023年12月18日

1.设计

1.1 背景

系统启动后,所有任务都在被执行,如果这时某个节点宕机,那它负责的任务就不能执行了,这对有稳定性要求的任务是不能接受的,所以系统要实现rebalance的功能。

1.2 设计

下面是Job分配与执行的业务点,重分配就是在 follower下线、controller下线、节点新上线进行重分配。理清楚接下来实现就是水到渠成了

2. 实现

2.1 RebalanceJobType

定义了重平衡job的类型

public enum RebalanceJobType {
    
    FOLLOWER_OFFLINE(0), CONTROLLER_OFFLINE(1), NODE_ONLINE(2);
    
    private int code;

    RebalanceJobType(int code) {
        this.code = code;
    }
    
    public boolean isFollowerOffline() {
        return this.code == FOLLOWER_OFFLINE.code;
    }

    public boolean isControllerOffline() {
        return this.code == CONTROLLER_OFFLINE.code;
    }

    public boolean isNodeOnline() {
        return this.code == NODE_ONLINE.code;
    }
    
    
}

2.2 AverageJobAllotStrategy

添加了 rebalanceJob的方法,只有Controller才能调用,对不同的重平衡情况进行分别处理

private Map<Long, List<DttaskJob>> getDttaskJobMap() {
    List<DttaskJob> allDttaskJob = getAllDttaskJob();
    return average(allDttaskJob);
}

@Override
public void rebalanceJob(RebalanceJobContext rebalanceJobContext) {
    if (rebalanceJobContext.getType().isFollowerOffline()
            || rebalanceJobContext.getType().isControllerOffline()) {
        long offlineServerId = rebalanceJobContext.getServerId();
        log.info("{}节点={}下线->重平衡job={}",
                rebalanceJobContext.getType().isFollowerOffline() ? "follower" : "controller",
                offlineServerId,
                rebalanceJobContext);
        List<DttaskJob> dttaskJobs = getByDttaskId(offlineServerId);
        List<NodeInfo> nodeInfoList = ServerInfo.getNodeInfoList();
        Map<Long, List<DttaskJob>> allotMap = new HashMap<>();
        int i = 0;
        int nodeCount = nodeInfoList.size();
        while (i < dttaskJobs.size()) {
            DttaskJob dttaskJob = dttaskJobs.get(i);
            NodeInfo nodeInfo = nodeInfoList.get(i % nodeCount);
            i++;
            List<DttaskJob> dttaskJobList = allotMap.getOrDefault(nodeInfo.getServerId(), new ArrayList<>());
            dttaskJobList.add(dttaskJob);
            allotMap.put(nodeInfo.getServerId(), dttaskJobList);
        }
        executeDttaskJob(new ExecuteDttaskJobContext(allotMap, true));
    } else if (rebalanceJobContext.getType().isNodeOnline()) {
        log.info("节点上线->重平衡job={}", rebalanceJobContext);
        long onlineServerId = rebalanceJobContext.getServerId();
        Map<Long, List<DttaskJob>> dttaskJobMap = BeanUseHelper.entityHelpService().queryDttaskJob();
        Map<Long, List<DttaskJob>> allotDttaskJobMap = getDttaskJobMap();
        Map<Long, List<DttaskJob>> stopDttaskJobMapOfOldNodes = new HashMap<>();
        Map<Long, List<DttaskJob>> startDttaskJobMapOfNewNodes = new HashMap<>();
        List<DttaskJob> startDttaskJobs = new ArrayList<>();
        dttaskJobMap.forEach((serverId, dttaskJobList) -> {
            int size = dttaskJobList.size();
            int newSize = allotDttaskJobMap.get(serverId).size();
            if (size > newSize) {
                List<DttaskJob> dttaskJobs = dttaskJobList.subList(0, size - newSize);
                stopDttaskJobMapOfOldNodes.put(serverId, dttaskJobs);
                startDttaskJobs.addAll(dttaskJobs);
            }
        });
        startDttaskJobMapOfNewNodes.put(onlineServerId, startDttaskJobs);
        executeDttaskJob(new ExecuteDttaskJobContext(stopDttaskJobMapOfOldNodes, false));
        executeDttaskJob(new ExecuteDttaskJobContext(startDttaskJobMapOfNewNodes, true));
    }
}

2.3 ServerClientChannelHandler

对节点下线进行重平衡处理

2.4 NodeOnlineMessageService

3. 测试

启动三个节点,节点完成选举,每个节点执行2个任务

  • 3号节点下线

1 2 节点各分配了一个任务继续执行

  • 3号节点上线

新上线的3号节点,重新得到2个任务,1 2节点各停止一个任务

至此,节点上下线的任务重平衡完成

文章来源:https://blog.csdn.net/u010142228/article/details/135050459
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。