前面我们分析了Broker和NonBroker通信,也分析了NonBroker和NonBroker通信,这里我们分析一下Broker和Broker通信过程。
我们直接一步到位,看NodeConnector的创建。
std::pair<Ref<NodeConnector>, IpczResult> CreateConnector(
Ref<Node> node,
Ref<DriverTransport> transport,
IpczConnectNodeFlags flags,
const std::vector<Ref<Portal>>& initial_portals,
Ref<NodeLink> broker_link,
NodeConnector::ConnectCallback callback) {
const bool from_broker = node->type() == Node::Type::kBroker;
const bool to_broker = (flags & IPCZ_CONNECT_NODE_TO_BROKER) != 0;
const bool share_broker = (flags & IPCZ_CONNECT_NODE_SHARE_BROKER) != 0;
const bool inherit_broker = (flags & IPCZ_CONNECT_NODE_INHERIT_BROKER) != 0;
if (from_broker) {
DriverMemoryWithMapping memory =
NodeLinkMemory::AllocateMemory(node->driver());
if (!memory.mapping.is_valid()) {
return {nullptr, IPCZ_RESULT_RESOURCE_EXHAUSTED};
}
if (to_broker) {
return {MakeRefCounted<NodeConnectorForBrokerToBroker>(
std::move(node), std::move(transport), std::move(memory),
flags, initial_portals, std::move(callback)),
IPCZ_RESULT_OK};
}
......
return {nullptr, IPCZ_RESULT_INVALID_ARGUMENT};
}
当前节点是broker的情况下,对端节点也是broker,就是broker to broker请求,创建NodeConnectorForBrokerToBroker。
NodeConnectorForBrokerToBroker
// NodeConnector:
bool Connect() override {
DVLOG(4) << "Sending direct ConnectFromBrokerToBroker from broker "
<< local_name_.ToString() << " with " << num_portals()
<< " initial portals";
ABSL_ASSERT(node_->type() == Node::Type::kBroker);
msg::ConnectFromBrokerToBroker connect;
connect.params().name = local_name_;
connect.params().protocol_version = msg::kProtocolVersion;
connect.params().num_initial_portals =
checked_cast<uint32_t>(num_portals());
connect.params().buffer = connect.AppendDriverObject(
link_memory_allocation_.memory.TakeDriverObject());
connect.params().padding = 0;
return IPCZ_RESULT_OK == transport_->Transmit(connect);
}
从broker链接另一个broker的消息对象为ConnectFromBrokerToBroker, ConnectFromBrokerToBroker的参数如下
name: 当前进程的NodeName,后面我们称这个进程为A进程,接收链接的进程为B进程。
protocol_version: 协议版本
num_initial_portals: 要初始化的RouterLink
buffer: 共享内存对象
padding: 用于对齐。
我们看对端收到消息如何处理
// NodeMessageListener overrides:
bool OnConnectFromBrokerToBroker(
msg::ConnectFromBrokerToBroker& connect) override {
const NodeName& remote_name = connect.params().name;
DVLOG(4) << "Accepting ConnectFromBrokerToBroker on broker "
<< local_name_.ToString() << " from other broker "
<< remote_name.ToString();
const LinkSide this_side =
remote_name < local_name_ ? LinkSide::kA : LinkSide::kB;
DriverMemory their_memory(
connect.TakeDriverObject(connect.params().buffer));
DriverMemoryMapping primary_buffer_mapping =
this_side.is_side_a() ? std::move(link_memory_allocation_.mapping)
: their_memory.Map();
if (!primary_buffer_mapping.is_valid()) {
return false;
}
Ref<NodeLink> link = NodeLink::CreateActive(
node_, this_side, local_name_, remote_name, Node::Type::kBroker,
connect.params().protocol_version, transport_,
NodeLinkMemory::Create(node_, std::move(primary_buffer_mapping)));
AcceptConnection({.link = link, .broker = link},
connect.params().num_initial_portals);
return true;
}
创建NodeLink, 注意这里面LinkSide 根据NodeName做比较确定。 共享内存使用LinkSide::kA端提供的共享内存。 对应的Connection 的broker 指向核对段的链接。
到这里我们所有的链接类型就分析完了,下面对NodeConnector 和 Connection的几种情况做一下总结。
Broker 和 NonBroker建立链接(假设A是Broker, B是NonBroker):
NonBroker 和NonBroker建立链接: (假设主动发起请求的NonBroker 为B, 另一个NonBroker为C, Broker 为A)
以上过程建立了三个链接:
C->A:Connection.link 为C到A的NodeLink, Connection.broker 为C到A的NodeLink。
A->C: Connection.link 为A到C的NodeLink, Connection.broker 为空。
B->C: Connection.link 为B到C的NodeLink, Connection.broker 为 B到A的NodeLink。
C->B: Connection.link 为C到B的NodeLink, Connection.broker 为 C到A的NodeLink。
也就是NonBroker<->NonBroker 的链接Connection->broker 都是和Broker的NodeLink。
Broker 和Broker建立链接
NodeConnectorForBrokerToBroker: 用于Broker 和Broker 建立链接, Connection.link 为指向对端的NodeLink, Connection.broker 也是指向对端的NodeLink。
到此几种链接我们都分析过了。下面就可以分析代理消除了。