SDN控制器-ONOS中的最终一致性存储

发布时间:2023年12月18日

ONOS中的数据存储基本上都是以KV进行存储的。按照一致性强弱类型可以分为强一致性存储(strong consistency)与弱一致性存储(eventually consistency)。

比较典型的,如ONOS中对于设备接口的存储,使用的是强一致类型存储,调用的是

org.onosproject.store.service.StorageService#consistentMapBuilder

所构建出来的ConsistentMap,由atomix项目提供封装,底层使用raft协议实现。遗憾的是atomix的JAVA版本目前已停止维护。

如ONOS中对于设备信息的存储,使用的是最终一致性(弱一致性)存储,实现机制使用的是gossip协议实现。

本文以gossip在ONOS中实现机制详细过程为目标,并使用理论+代码实践的方式进行综合性梳理记录。

gossip协议(理论)

A gossip protocol or epidemic protocol is a procedure or process of computer peer-to-peer communication that is based on the way epidemics spread.

Gossip protocol 或 Epidemic Protocol (流行病协议),是基于流行病传播方式的节点或者进程之间信息交换的协议。gossip协议在分布式系统中处理数据最终一致性的场景得到了广泛应用。

goosip协议中数据的传播方式有两种,分别为反熵(Anti-Entropy)和传谣(Anti-Entropy)。

传播方式

反熵

先回顾一下熵(Entropy) 的概念。
在信息论中,熵一般指的是对不确定程度的一种度量。如用熵来描述数据的随机性,熵越大则数据的有序性也就越差。对随机数来讲熵越大,随机数的随机性越好。

所谓反熵(Anti-Entropy) 即是熵(Entropy) 的反派,——和熵对着干

提炼一下:在集群的多个节点中,某个节点的数据在向集群中其他节点传播的过程中,在反熵作用下会同步节点的全部数据,最终达到集群中各节点数据完全的一致。

反熵传播使用“simple epidemics(SI model)”的方式,以固定的概率传播所有的数据。所有参与节点只有两种状态:

Suspective(病原):处于 susceptible 状态的节点代表其并没有收到来自其他节点的更新。
Infective(感染):处于 infective 状态的节点代表其有数据更新,并且会将这个数据分享给其他节点。

反熵传播方法每次节点两两交换自己的所有数据会带来非常大的通信负担,因此不会频繁使用,通常只用于新加入节点的数据初始化。

传谣

Rumor-Mongering(谣言传播)是gossip中的另一种数据传播机制。

使用“complex epidemics”(SIR model)的方式,以固定的概率仅传播新到达的数据。所有参与节点有三种状态:Suspective(病原)、Infective(感染)、Removed(愈除)。

Removed(愈除):其已经接收到来自其他节点的更新,但是其并不会将这个更新分享给其他节点。

谣言传播过程是消息只包含最新 update,谣言消息在某个时间点之后会被标记为removed,并且不再被传播。缺点是系统有一定的概率会不一致,通常用于节点间数据增量同步。

通信方式

推送push、拉pull、推拉模式(push/pull)

推送push

节点A随机选择联系节点B,并向其发送自己的信息,节点B在收到信息后比较/更新自己的数据。

拉取模式(pull)

节点A随机选择联系节点B,从对方获取信息,节点A在收到信息后比较/更新自己的数据。

推/拉模式(push/pull)

节点A向选择的节点B发送信息,同时从对方获取信息,节点A和节点B在收到信息后各自比较/更新自己的数据。

GOSSIP在ONOS中的应用(实践)

在了解了GOSSIP的理论知识后,再结合ONOS中的设备信息存储的原理,学习一下GOSSIP在ONOS中的具体实现细节。

ONOS集群管理

在一个onos集群中,由多个node节点组成了一个onos集群。

配置时通过修改onos根路径下的config/cluster.json配置文件配置集群信息。ONOS官方也提供了脚本可自动生成集群所需的配置信息,链接如下:https://wiki.onosproject.org/display/ONOS/Notes+on+cluster+formation+for+Docker+instances

配置示例:

{   
    "node": { 
        "ip": "192.168.1.11",
        "id": "onos-1",
        "port": 9876
    }, 
    "storage": [
        {   
            "ip": "192.168.2.21",
            "id": "atomix-1",
            "port": 5679
        }
    ],
    "name": "onos"
}

在onos集群中各onos节点一般使用9876端口进行通信(东西向通信),具体实现方式也由atomix框架提供。

集群信息的维护由ClusterManager调用AtomixClusterStore实现,集群中的节点信息保存在atomix中。在ONOS中如要获取集群中的节点信息,可调用如下方法获取:

atomixManager.getAtomix().getMembershipService().getMembers()

ONOS中也提供了ClusterCommunicationService用于集群中的节点与其他节点进行通信,如向集群中的所有节点发送消息(广播),直接调用ClusterCommunicationService中的broadcast方法即可,节点间发送和处理消息变得简单又方便。

atomix集群

生产环境中,为提高系统可用性onos和atomix都以集群方式运行。atomix各节点一般使用5679端口进行通信,底层使用RAFT协议实现数据的强一致性。

RAFT协议详细介绍:https://raft.github.io/

atomix成员管理机制

atomix集群中各节点的状态由atomix中的成员管理协议(MembershipProtocol)决定。
当前版本中的atomix支持以下两种协议:

  • HeartbeatMembershipProtocol
  • SwimMembershipProtocol

heartbeat与swim的区别可用如下两张图对比:
heartbeat方式通信示意图:
heartbeat
swim方式通信示意图:
swimProtocol

SWIM在每个周期的检测流程如下:

  1. 一个node A发送ping给list中的随机一个node(比如就叫B吧),如果B收到了就返回ack给A。
  2. 如果A没在预定的时间(小于周期T)内收到这个ack,它会在list中随机挑选 k 个node并发送 ping-req(B)请求它们帮助自己来确认B是否活着,若没有任何一个node告诉A说B活着,那A认为B挂了,并把它从list中移除,然后广播到整个网络中去。

总结一下
heartbeat方式下集群中进行一次数据交互的发包次数为N2,适用于小规模集群。反之如果集群中的主机较多,则使用swim协议可提升通信性能。

需要注意的是,在atomix协议中swim的实现使用到了UDP协议,一定条件下存在丢包出现不稳定的情况。如果集群中节点数较少还是推荐使用heartbeat的方式。以下atomix中使用hearbeat方式的某个示例:

cluster {
  clusterId: onos
  discovery {
    type: bootstrap
    nodes.1 {
      id: atomix-1
      address: "192.168.2.21:5679"
    }
    nodes.2 {
      id: atomix-2
      address: "192.168.2.22:5679"
    }
    nodes.3 {
      id: atomix-3
      address: "192.168.2.23:5679"
    }
  }
 
  protocol {
    type: heartbeat
    heartbeatInterval: 1s
    phiFailureThreshold: 10
    failureTimeout: 10s
  }
}
managementGroup {
  type: raft
  partitions: 1
  storage.level: disk
  members: [atomix-1, atomix-2, atomix-3]
}
partitionGroups.raft {
  type: raft
  partitions: 3
  partitionSize: 3
  storage.level: disk
  members: [atomix-1, atomix-2, atomix-3]
}

设备信息存储源码分析

下面进入正题,——以ONOS中的Device信息存储为例,分析整理一下GOSSIP在设备信息存储中的实现细节。

ONOS中设备信息的管理由实现了DeviceService的DeviceManager进行承载,在DeviceManger中对设备信息的操作使用的是面向接口的方式最终调用的是DeviceStore接口。

实现了DeviceStore的具体类有ECDeviceStore和GossipDeviceStore。当前较新版本的ONOS中使用的是GossipDeviceStore,也就是下文中要详细跟进的重点——Gossip协议实现的DeviceStore。

GossipDeviceStore#activate

ONOS中的类主要都是构建在OSGI上的,GossipDeviceStore也不例外。按照OSGI对象管理的方式,一个组件(Component)会执行被 @Activate 注解修饰的方法进行初始化时,执行被 @Deactivate 注解修饰的方法进行资源释放。

GossipDeviceStore作为Gossip协议实现的存储类,许多的核心步骤也主要在activate方法中进行整体的管理。

上源码:

    @Activate
    public void activate() {
        //资源申请,获得线程池
        executor = newCachedThreadPool(groupedThreads("onos/device", "fg-%d", log));

        backgroundExecutor =
                newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d", log)));

        //注册回调方法,接收处理节点发来的消息
        addSubscriber(DEVICE_UPDATE, this::handleDeviceEvent);
        addSubscriber(DEVICE_STATUS_CHANGE, this::handleDeviceStatusChangeEvent);
        addSubscriber(DEVICE_REMOVE_REQ, this::handleRemoveRequest);
        addSubscriber(DEVICE_REMOVED, this::handleDeviceRemovedEvent);
        addSubscriber(PORT_UPDATE, this::handlePortEvent);
        addSubscriber(PORT_STATUS_UPDATE, this::handlePortStatusEvent);
        addSubscriber(DEVICE_ADVERTISE, this::handleDeviceAdvertisement);

        // start anti-entropy thread;启动反熵定时任务
        backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
                                               initialDelaySec, periodSec, TimeUnit.SECONDS);

        ……
    }
    
    private <M> void addSubscriber(MessageSubject subject, Consumer<M> handler) {
        clusterCommunicator.addSubscriber(subject, SERIALIZER::decode, handler, executor);
    }

如上面所示,在GossipDeviceStore的activate方法中主要做了两件事:

  1. 申请线程池资源,启动反熵定时任务,5秒执行一次
  2. 添加消息处理器,接收其他节点的发来的消息并进行处理

createOrUpdateDevice

在有了activate方法的准备工作后,再看一下数据在进行变更时的具体过程。进入createOrUpdateDevice方法。

 @Override
    public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
                                                         DeviceId deviceId,
                                                         DeviceDescription deviceDescription) {
        NodeId localNode = clusterService.getLocalNode().id();
        NodeId deviceNode = mastershipService.getMasterFor(deviceId);
        boolean isMaster = localNode.equals(deviceNode);

        // Process device update only if we're the master,
        // otherwise signal the actual master.
        DeviceEvent deviceEvent = null;

        // If this node is the master for the device, acquire a new timestamp. Otherwise,
        // use a 0,0 or tombstone timestamp to create the device if it doesn't already exist.
        Timestamp newTimestamp;
        try {
            newTimestamp = isMaster
                    ? deviceClockService.getTimestamp(deviceId)
                    : removalRequest.getOrDefault(deviceId, DEFAULT_TIMESTAMP);
        } catch (IllegalStateException e) {
            newTimestamp = removalRequest.getOrDefault(deviceId, DEFAULT_TIMESTAMP);
            isMaster = false;
        }
        final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
        final Timestamped<DeviceDescription> mergedDesc;
        final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);

        synchronized (device) {
            //对device信息进行数据更新,操作节点中的MAP
            deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
            if (deviceEvent == null) {
                return null;
            }
            mergedDesc = device.get(providerId).getDeviceDesc();
        }

        // If this node is the master for the device, update peers.
        if (isMaster) {
            //此节点为主节点时,向集群中的所有节点发送通知
            log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
                    providerId, deviceId);
            notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
        }
        notifyDelegateIfNotNull(deviceEvent);

        return deviceEvent;
    }

围绕gossip的主流程,createOrUpdateDevice方法中先是调用了节点中的方法,对device信息存储的map进行了数据更新,数据更新后调用了notifyPeers向集群中的所有节点发送了广播消息,进行数据变更通知。

通知消息发送代码片段如下:

    private void notifyPeers(InternalDeviceEvent event) {
        broadcastMessage(DEVICE_UPDATE, event);
    }
    
    private void broadcastMessage(MessageSubject subject, Object event) {
        clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
    }

看到这里再结合一下gossip的理论知识:数据在发生变更时会进行传播。

handleDeviceEvent

顺着消息的流程再看一下处理消息的实现。
根据初始化时的事件注册代码得到DEVICE_UPDATE的处理代码:

addSubscriber(DEVICE_UPDATE, this::handleDeviceEvent);

也就是GossipDeviceStore中的handleDeviceEvent方法

    private void handleDeviceEvent(InternalDeviceEvent event) {
        ProviderId providerId = event.providerId();
        DeviceId deviceId = event.deviceId();
        //从消息中获取出设备描述信息
        Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();

        try {
            //调用内部方法更新本节点中的维护的设备信息
            notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId,
                    deviceDescription));
        } catch (Exception e) {
            log.warn("Exception thrown handling device update", e);
        }
    }

节点收到DEVICE_UPDATE消息后会直接调用本节点中的方法对数据进行更新,源码如下:

    private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
                                                     DeviceId deviceId,
                                                     Timestamped<DeviceDescription> deltaDesc) {

        // Collection of DeviceDescriptions for a Device
        Map<ProviderId, DeviceDescriptions> device
                = getOrCreateDeviceDescriptionsMap(deviceId);
        
        //对要操作的设备加锁操作
        synchronized (device) {
            // locking per device
            //已删除的设备不处理
            if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
                log.debug("Ignoring outdated event: {}", deltaDesc);
                return null;
            }
            //获取出本节点维护的设备信息,没有就用传入的数据创建一个
            DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);

            final Device oldDevice = devices.get(deviceId);
            final Device newDevice;
    
            //发来的DeviceDesc较新,则进行替换并组装到newDevice中
            if (deltaDesc == descs.getDeviceDesc() ||
                    deltaDesc.isNewer(descs.getDeviceDesc())) {
                // on new device or valid update
                descs.putDeviceDesc(deltaDesc);
                newDevice = composeDevice(deviceId, device);
            } else {
                // outdated event, ignored.
                return null;
            }
            
            
            
            if (oldDevice == null) {
                //本节点以前没有此设备的任何信息,就进行创建
                // REGISTER
                if (!deltaDesc.value().isDefaultAvailable()) {
                    return registerDevice(providerId, newDevice, deltaDesc.timestamp());
                }
                // ADD
                return createDevice(providerId, newDevice, deltaDesc.timestamp());
            } else {
                //更新设备信息为newDevice
                // UPDATE or ignore (no change or stale)
                return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp(),
                                    deltaDesc.value().isDefaultAvailable());
            }
        }
    }
    
    /**
     * Checks if given timestamp is superseded by removal request
     * with more recent timestamp.
     *
     * @param deviceId         identifier of a device
     * @param timestampToCheck timestamp of an event to check
     * @return true if device is already removed
     */
    private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
        Timestamp removalTimestamp = removalRequest.get(deviceId);
        if (removalTimestamp != null &&
                removalTimestamp.compareTo(timestampToCheck) > 0) {
            // removalRequest is more recent
            return true;
        }
        return false;
    }

createOrUpdateDeviceInternal方法中主要对传入的Timestamped与本节点存储的信息进行对比,根据时间戳进行对比,保持本节点中所存储的信息一直是最新的

SendAdvertisementTask

上面的DEVICE_UPDATE消息是设备信息变更后主动发送通知的处理流程。如仅有这一种变更通知,对于集群中节点会动态增删的场景则还不一定能保证数据的最终一致性。

这时,基于Gossip协议实现的SendAdvertisementTask方法则是核心。再回顾一下它的触发条件:

private long initialDelaySec = 5;
private long periodSec = 5;
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), initialDelaySec, periodSec, TimeUnit.SECONDS);

每5秒执行一次SendAdvertisementTask,也就是前文理论知识中提及到的传播。看一下具体实现:

    private final class SendAdvertisementTask implements Runnable {

        @Override
        public void run() {
            if (Thread.currentThread().isInterrupted()) {
                log.debug("Interrupted, quitting");
                return;
            }

            try {
                //本节点
                final NodeId self = clusterService.getLocalNode().id();
                //集群中的所有节点
                Set<ControllerNode> nodes = clusterService.getNodes();

                ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
                        .transform(toNodeId())
                        .toList();
                
                //集群中只有一个节点,不进行传播
                if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
                    log.trace("No other peers in the cluster.");
                    return;
                }

                //随机选择一个集群中的其它节点
                NodeId peer;
                do {
                    int idx = RandomUtils.nextInt(0, nodeIds.size());
                    peer = nodeIds.get(idx);
                } while (peer.equals(self));
                
                //创建一个公告消息
                DeviceAntiEntropyAdvertisement ad = createAdvertisement();

                if (Thread.currentThread().isInterrupted()) {
                    log.debug("Interrupted, quitting");
                    return;
                }
                
                //向选择的节点进行单播,传递公告消息
                try {
                    unicastMessage(peer, DEVICE_ADVERTISE, ad);
                } catch (IOException e) {
                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
                    return;
                }
            } catch (Exception e) {
                // catch all Exception to avoid Scheduled task being suppressed.
                log.error("Exception thrown while sending advertisement", e);
            }
        }
    }

将以上代码总结一下:
SendAdvertisementTask每间隔5秒执行一次,主要功能为:根据当前的设备信息生成一条公告消息,并在集群中随机选择一个节点,将公告消息进行传递

公告消息生成代码如下:

    private DeviceAntiEntropyAdvertisement createAdvertisement() {
        //当前节点
        final NodeId self = clusterService.getLocalNode().id();
        
        final int numDevices = deviceDescs.size();
        //生成公告消息map
        Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
        final int portsPerDevice = 8; // random factor to minimize reallocation
        Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
        Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);

        deviceDescs.forEach((deviceId, devDescs) -> {
            //遍历每个设备生成公告消息
            // for each Device...
            synchronized (devDescs) {
                
                //填充离线设备ID
                // send device offline timestamp
                Timestamp lOffline = this.offline.get(deviceId);
                if (lOffline != null) {
                    adOffline.put(deviceId, lOffline);
                }
                
                
                for (Entry<ProviderId, DeviceDescriptions>
                        prov : devDescs.entrySet()) {

                    // for each Provider Descriptions...
                    final ProviderId provId = prov.getKey();
                    final DeviceDescriptions descs = prov.getValue();
                    //放入设备描述信息的时间戳
                    adDevices.put(new DeviceFragmentId(deviceId, provId),
                                  descs.getDeviceDesc().timestamp());
                    
                    for (Entry<PortNumber, Timestamped<PortDescription>>
                            portDesc : descs.getPortDescs().entrySet()) {
                        //放入设备端口信息的时间戳
                        final PortNumber number = portDesc.getKey();
                        adPorts.put(new PortFragmentId(deviceId, provId, number),
                                    portDesc.getValue().timestamp());
                    }
                }
            }
        });
        //生成反熵公告消息
        return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
    }

与前文理论知识部分提到的一样,为了提高反熵消息在节点中传播的性能,传播的消息一般仅传播摘要消息,这里的createAdvertisement方法中仅传递时间戳也是一种很好的实现方法。

handleDeviceAdvertisement

再看一下节点收到公告消息的处理细节,即GossipDeviceStore初始化部分的DEVICE_ADVERTISE消息,处理方法为handleDeviceAdvertisement

addSubscriber(DEVICE_ADVERTISE, this::handleDeviceAdvertisement);

源码如下:

    /**
     * Responds to anti-entropy advertisement message.
     * <p>
     * Notify sender about out-dated information using regular replication message.
     * Send back advertisement to sender if not in sync.
     *
     * @param advertisement to respond to
     */
    private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
        /*
         * NOTE that when an instance rejoins the cluster, it will generate
         * device events and send to the local apps through the delegate. This
         * approach might be not the best if the apps are not enough robust or
         * if there is no proper coordination in the cluster. Also, note that
         * any ECMap will act on the same way during the bootstrap process
         */
        final NodeId sender = advertisement.sender();

        Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
        Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
        Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());

        // Fragments to request
        Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
        Collection<PortFragmentId> reqPorts = new ArrayList<>();
        
        //遍历本节点中的设备
        for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
            final DeviceId deviceId = de.getKey();
            final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();

            //对遍历的设备加锁处理
            synchronized (lDevice) {
                // latestTimestamp across provider
                // Note: can be null initially
                // 从下线设备MAP中获取到此设备,可能为空
                Timestamp localLatest = offline.get(deviceId);

                // handle device Ads
                for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
                    final ProviderId provId = prov.getKey();
                    final DeviceDescriptions lDeviceDescs = prov.getValue();

                    final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);


                    Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
                    Timestamp advDevTimestamp = devAds.get(devFragId);

                    if (advDevTimestamp == null || lProvDevice.isNewerThan(
                            advDevTimestamp)) {
                        // remote does not have it or outdated, suggest
                        log.trace("send to {} device update {} for {}", sender, lProvDevice, deviceId);
                        // 对端节点的device信息已过期,使用DEVICE_UPDATE将设备信息推送给对端
                        notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
                    } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
                        // local is outdated, request
                        log.trace("need update {} < {} for device {} from {}", lProvDevice.timestamp(),
                                advDevTimestamp, deviceId, sender);
                        // 本地节点的数据已过期,添加到请求集合中以拉取新设备信息
                        reqDevices.add(devFragId);
                    }

                    // handle port Ads
                    ……
                    ……
                    ……
                    // remove device Ad already processed
                    devAds.remove(devFragId);

                    // find latest and update
                    final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
                    if (localLatest == null ||
                            providerLatest.compareTo(localLatest) > 0) {
                        localLatest = providerLatest;
                    }
                } // end local provider loop

                // checking if remote timestamp is more recent.
                Timestamp rOffline = offlineAds.get(deviceId);
                if (localLatest == null || (rOffline != null && rOffline.compareTo(localLatest) > 0)) {
                    // 对比对端节点与本节点的离线设备数据,识别出已过期数据
                    // remote offline timestamp suggests that the
                    // device is off-line
                    log.trace("remote offline timestamp from {} suggests that the device {} is off-line",
                            sender, deviceId);
                    // 本地节点数据已过期,本节点数据将其标注为已下线
                    markOfflineInternal(deviceId, rOffline);
                }

                Timestamp lOffline = offline.get(deviceId);
                if (lOffline != null && rOffline == null) {
                    // locally offline, but remote is online, suggest offline
                    log.trace("suggest to {} sthat the device {} is off-line", sender, deviceId);
                    // 本节点中device已离线,对端节点device仍在线,发送DEVICE_STATUS_CHANGE消息通知对端节点设备已离线
                    notifyPeer(sender, new InternalDeviceStatusChangeEvent(deviceId, lOffline, false));
                }

                // remove device offline Ad already processed
                offlineAds.remove(deviceId);
            } // end local device loop
        } // device lock

        // If there is any Ads left, request them
        log.trace("Ads left {}, {}", devAds, portAds);
        reqDevices.addAll(devAds.keySet());
        reqPorts.addAll(portAds.keySet());

        if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
            log.trace("Nothing to request to remote peer {}", sender);
            //没有需要向外发送的pull消息,处理流程结束
            return;
        }

        log.debug("Need to sync {} {}", reqDevices, reqPorts);

        // 2-way Anti-Entropy for now
        try {
            // 本节点有需要pull的消息,向对端信息发送DEVICE_ADVERTISE消息以被动获取最新数据
            unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
        } catch (IOException e) {
            log.error("Failed to send response advertisement to " + sender, e);
        }

    }

在handleDeviceAdvertisement方法中定义了处理DEVICE_ADVERTISE消息的详细过程,即:节点间进行数据交换的具体过程。

收到远程节点发送的DEVICE_ADVERTISE消息后,本节点会与远程节点中的数据做对比,如本节点中的数据较新,本节点会将它所拥有的较新设备数据发送给对端节点;如本节点中的数据较旧,本节点也会向对端节点发送一个DEVICE_ADVERTISE消息以获得对端节点中的较新数据。

总结

ONOS中设备信息的存储基于GOSSIP协议实现,节点间进行数据交换主要有定时随机传播数据变更后主动广播两种方式。

通信的实现上基于atomix组件(底层基于NETTY)使得用户无需关心具体的通信过程。在节点数据交换的过程中,主要使用PUSH的方式进行数据交换。首次PUSH时本节点仅传递本节点的摘要信息(基于时间戳)推送到对端节点,对端节点收到摘要数据后进行对比,将新的数据PUSH到源节点。

基于gossip实现的设备信息存储,数据的落地都在各节点的内存中,这种方式可以向本节点的服务提供快速的访问,通过节点间数据的不定时传播也达到了数据的最终一致性。结合ONOS中某一个设备仅能拥有一个主节点的背景,设备信息使用gossip协议实现确实是一种比较好的选择。

文章来源:https://blog.csdn.net/puhaiyang/article/details/135045690
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。