chromium通信系统-ipcz系统(七)-ipcz系统代码实现-跨Node通信-NonBroker和NonBroker通信

发布时间:2024年01月01日

chromium通信系统-ipcz系统(六)-ipcz系统代码实现-跨Node通信-基础通信 一文中我们分析了broker 和 nonbroker 通信的过程。本文我们来分析NonBroker 和NonBroker的通信过程,同样以单元测试为例子分析。

mojo/core/invitation_unittest.cc

 951 DEFINE_TEST_CLIENT(NonBrokerToNonBrokerHost) {
 952   MojoHandle invitation = AcceptInvitation(MOJO_ACCEPT_INVITATION_FLAG_NONE);
 953   MojoHandle test = ExtractPipeFromInvitation(invitation);
 954 
 955   MojoHandle pipe_for_client;
 956   EXPECT_EQ("aaa", ReadMessageWithHandles(test, &pipe_for_client, 1));
 957 
 958   MojoHandle client;
 959   base::Process client_process =
 960       LaunchChildTestClient("NonBrokerToNonBrokerClient", &client, 1,
 961                             MOJO_SEND_INVITATION_FLAG_SHARE_BROKER);
 962 
 963   // Forward the pipe from the test to the client, then wait. We're done
 964   // whenever the client acks. The success of the test is determined by
 965   // the outcome of interactions between the test and the client process.
 966   WriteMessageWithHandles(client, "ddd", &pipe_for_client, 1);
 967 
 968   // Wait for a signal from the test to let us know we can terminate.
 969   EXPECT_EQ("bye", ReadMessage(test));
 970   WriteMessage(client, "bye");
 971   WaitForProcessToTerminate(client_process);
 972 
 973   MojoClose(client);
 974   MojoClose(test);
 975 }

在TEST_F(MAYBE_InvitationTest, NonBrokerToNonBroker) 这个单元测试工A和B进程是Broker和NonBroker 通信,B进程启动后又调用LaunchChildTestClient 启动C进程(代码960行), 我们进入590行来分析B和C如何通信。LaunchChildTestClient我们已经分析过了,不过在broker 启动非broker进程中第三个参数是MOJO_SEND_INVITATION_FLAG_NONE,而这里是MOJO_SEND_INVITATION_FLAG_SHARE_BROKER,通过值的名称我们可以看出来,这里是要建立共享broker。LaunchChildTestClient前边的调用流程和Broker 启动NonBroker没有区别,我们重点分析对MOJO_SEND_INVITATION_FLAG_SHARE_BROKER的特殊处理, 直接看建立Connector的过程。

third_party/ipcz/src/ipcz/node_connector.cc

528 // static
529 IpczResult NodeConnector::ConnectNode(
530     Ref<Node> node,
531     Ref<DriverTransport> transport,
532     IpczConnectNodeFlags flags,
533     const std::vector<Ref<Portal>>& initial_portals,
534     ConnectCallback callback) {
535   const bool from_broker = node->type() == Node::Type::kBroker;
536   const bool to_broker = (flags & IPCZ_CONNECT_NODE_TO_BROKER) != 0;
537   const bool inherit_broker = (flags & IPCZ_CONNECT_NODE_INHERIT_BROKER) != 0;
538   const bool share_broker = (flags & IPCZ_CONNECT_NODE_SHARE_BROKER) != 0;
539   Ref<NodeLink> broker_link = node->GetBrokerLink();
      ......
546 
547   auto [connector, result] = CreateConnector(
548       std::move(node), std::move(transport), flags, initial_portals,
549       std::move(broker_link), std::move(callback));
550   if (result != IPCZ_RESULT_OK) {
551     return result;
552   }
553 
554   if (!share_broker && !connector->ActivateTransport()) {
555     // Note that when referring another node to our own broker, we don't
556     // activate the transport, since the transport will be passed to the broker.
557     // See NodeConnectorForReferrer.
558     return IPCZ_RESULT_UNKNOWN;
559   }
560 
561   if (!connector->Connect()) {
562     return IPCZ_RESULT_UNKNOWN;
563   }
564 
565   return IPCZ_RESULT_OK;
566 }

先分析B进程
函数参数transport是指B和C的传输点。flags 为IPCZ_CONNECT_NODE_SHARE_BROKER,所以538行share_broker为真。547行创建Connector。 与Broker 和NonBroker通信不同,这里554行并不会激活传输点。 561行直接调用Connector的Connect方法。我们先来看CreateConnector方法创建哪一类Connector。

471 std::pair<Ref<NodeConnector>, IpczResult> CreateConnector(
472     Ref<Node> node,
473     Ref<DriverTransport> transport,
474     IpczConnectNodeFlags flags,
475     const std::vector<Ref<Portal>>& initial_portals,
476     Ref<NodeLink> broker_link,
477     NodeConnector::ConnectCallback callback) {
478   const bool from_broker = node->type() == Node::Type::kBroker;
479   const bool to_broker = (flags & IPCZ_CONNECT_NODE_TO_BROKER) != 0;
480   const bool share_broker = (flags & IPCZ_CONNECT_NODE_SHARE_BROKER) != 0;
481   const bool inherit_broker = (flags & IPCZ_CONNECT_NODE_INHERIT_BROKER) != 0;
      ......
508 
509   if (share_broker) {
510     return {MakeRefCounted<NodeConnectorForReferrer>(
511                 std::move(node), std::move(transport), flags, initial_portals,
512                 std::move(broker_link), std::move(callback)),
513             IPCZ_RESULT_OK};
514   }
515 
     ......
522 
523   return {nullptr, IPCZ_RESULT_INVALID_ARGUMENT};
524 }

函数510行创建了NodeConnectorForReferrer。

  NodeConnectorForReferrer(Ref<Node> node,
                           Ref<DriverTransport> transport,
                           IpczConnectNodeFlags flags,
                           std::vector<Ref<Portal>> waiting_portals,
                           Ref<NodeLink> broker_link,
                           ConnectCallback callback)
      : NodeConnector(std::move(node),
                      /*transport=*/nullptr,
                      flags,
                      std::move(waiting_portals),
                      std::move(callback)),
        transport_for_broker_(std::move(transport)),
        broker_link_(std::move(broker_link)) {}


NodeConnectorForReferrer的成员变量transport_for_broker_代表B和C的传输点。 broker_link_则表示B->A(Broker) 的链接。

166   // NodeConnector:
167   bool Connect() override {
168     ABSL_ASSERT(node_->type() == Node::Type::kNormal);
169     if (!broker_link_) {
170       // If there's no broker link yet, wait for one.
171       node_->WaitForBrokerLinkAsync(
172           [connector = WrapRefCounted(this)](Ref<NodeLink> broker_link) {
173             connector->broker_link_ = std::move(broker_link);
174             connector->Connect();
175           });
176       return true;
177     }
178 
179     broker_link_->ReferNonBroker(
180         std::move(transport_for_broker_), checked_cast<uint32_t>(num_portals()),
181         [connector = WrapRefCounted(this), broker = broker_link_](
182             Ref<NodeLink> link_to_referred_node,
183             uint32_t remote_num_initial_portals) {
184           if (link_to_referred_node) {
185             connector->AcceptConnection(
186                 {.link = link_to_referred_node, .broker = broker},
187                 remote_num_initial_portals);
188           } else {
189             connector->RejectConnection();
190           }
191         });
192     return true;
193   }

NodeConnectorForReferrer->Connect()函数:
169到177行如果A和B还没完全建立链接,则调用node_->WaitForBrokerLinkAsync异步等待链接建立后再执行NodeConnectorForReferrer->Connect()函数。
179-191行broker_link_->ReferNonBroker() 函数建立链接。181-191行为一个回调函数,我们后面分析。

 223 void NodeLink::ReferNonBroker(Ref<DriverTransport> transport,
 224                               uint32_t num_initial_portals,
 225                               ReferralCallback callback) {
 226   ABSL_ASSERT(node_->type() == Node::Type::kNormal &&
 227               remote_node_type_ == Node::Type::kBroker);
 228 
 229   uint64_t referral_id;
 230   {
 231     absl::MutexLock lock(&mutex_);
 232     for (;;) {
 233       referral_id = next_referral_id_++;
 234       auto [it, inserted] =
 235           pending_referrals_.try_emplace(referral_id, std::move(callback));
 236       if (inserted) {
 237         break;
 238       }
 239     }
 240   }
 241 
 242   msg::ReferNonBroker refer;
 243   refer.params().referral_id = referral_id;
 244   refer.params().num_initial_portals = num_initial_portals;
 245   refer.params().transport =
 246       refer.AppendDriverObject(transport->TakeDriverObject());
 247   Transmit(refer);
 248 }

注意这个NodeLink是B->A的NodeLink,A是Broker。
229 -240行生成一个referral_id,用于维护请求和回调函数的关系。
242-247行创建ReferNonBroker消息体,并发起传输。ReferNonBroker有三个参数referral_id用于找到对应回调。num_initial_portals表示B和C要建立的RouterLink。 transport为B->C的传输端点。
我们来看A如何处理该消息

 366 bool NodeLink::OnReferNonBroker(msg::ReferNonBroker& refer) {
 367   if (remote_node_type_ != Node::Type::kNormal ||
 368       node()->type() != Node::Type::kBroker) {
 369     return false;
 370   }
 371 
 372   DriverObject transport = refer.TakeDriverObject(refer.params().transport);
 373   if (!transport.is_valid()) {
 374     return false;
 375   }
 376 
 377   DriverMemoryWithMapping link_memory =
 378       NodeLinkMemory::AllocateMemory(node()->driver());
 379   DriverMemoryWithMapping client_link_memory =
 380       NodeLinkMemory::AllocateMemory(node()->driver());
      ......
 390 
 391   return NodeConnector::HandleNonBrokerReferral(
 392       node(), refer.params().referral_id, refer.params().num_initial_portals,
 393       WrapRefCounted(this),
 394       MakeRefCounted<DriverTransport>(std::move(transport)),
 395       std::move(link_memory), std::move(client_link_memory));
 396 }

函数很简单,377-380行创建了两个内存映射对象,实际上指向同一块内存。
391行调用 NodeConnector::HandleNonBrokerReferral() 函数进一步处理。注意这里的transport是通过B->C的文件描述符反序列化出来的(文件描述符通过socket传递),所以这里A->C也具备了信道。

bool NodeConnector::HandleNonBrokerReferral(
    Ref<Node> node,
    uint64_t referral_id,
    uint32_t num_initial_portals,
    Ref<NodeLink> referrer,
    Ref<DriverTransport> transport_to_referred_node,
    DriverMemoryWithMapping link_memory,
    DriverMemoryWithMapping client_link_memory) {
  ABSL_ASSERT(node->type() == Node::Type::kBroker);
  auto connector = MakeRefCounted<NodeConnectorForBrokerReferral>(
      std::move(node), referral_id, num_initial_portals, std::move(referrer),
      std::move(transport_to_referred_node), std::move(link_memory),
      std::move(client_link_memory));

  // The connector effectively owns itself and lives only until its transport is
  // disconnected or it receives a greeting from the referred node.
  return connector->ActivateTransport();
}

HandleNonBrokerReferral函数先创建了NodeConnectorForBrokerReferral对象,这是一个Connector对象,然后调用connector->ActivateTransport()激活传输点。先来看NodeConnectorForBrokerReferral对象创建

  NodeConnectorForBrokerReferral(Ref<Node> node,
                                 uint64_t referral_id,
                                 uint32_t num_initial_portals,
                                 Ref<NodeLink> referrer,
                                 Ref<DriverTransport> transport,
                                 DriverMemoryWithMapping link_memory,
                                 DriverMemoryWithMapping client_link_memory)
      : NodeConnector(std::move(node),
                      std::move(transport),
                      IPCZ_NO_FLAGS,
                      /*waiting_portals=*/{},
                      /*callback=*/nullptr),
        referral_id_(referral_id),
        num_initial_portals_(num_initial_portals),
        referrer_(std::move(referrer)),
        link_memory_(std::move(link_memory)),
        client_link_memory_(std::move(client_link_memory)) {
    ABSL_HARDENING_ASSERT(link_memory_.mapping.is_valid());
    ABSL_HARDENING_ASSERT(client_link_memory_.mapping.is_valid());
  }

NodeConnectorForBrokerReferral的成员变量如下:

  • referral_id_ 对端请求的id
  • num_initial_portals_ B和C要初始建立的RouterLink数量
  • referrer_: A->B的链接
  • transport:A->C的传输点
  • link_memory_ : Broker创建的共享内存对象
  • client_link_memory_: Broker创建的共享内存对象
    到目前位置建立的信道:
    A<->B, A<->C, B<->C。

传输点激活的代码我们在chromium通信系统-ipcz系统(五)-ipcz系统代码实现-信道和共享内存一文已经分析过了。

我们接下来看C进程接收邀请。

DEFINE_TEST_CLIENT(NonBrokerToNonBrokerClient) {
  MojoHandle invitation =
      AcceptInvitation(MOJO_ACCEPT_INVITATION_FLAG_INHERIT_BROKER);
 ......
}

这里面标志为MOJO_ACCEPT_INVITATION_FLAG_INHERIT_BROKER, AcceptInvitation在chromium通信系统-ipcz系统(五)-ipcz系统代码实现-信道和共享内存一文已经分析过了。我们主要关注对MOJO_ACCEPT_INVITATION_FLAG_INHERIT_BROKER标志的处理。同样直接看Connect的过程

528 // static
529 IpczResult NodeConnector::ConnectNode(
530     Ref<Node> node,
531     Ref<DriverTransport> transport,
532     IpczConnectNodeFlags flags,
533     const std::vector<Ref<Portal>>& initial_portals,
534     ConnectCallback callback) {
535   const bool from_broker = node->type() == Node::Type::kBroker;
536   const bool to_broker = (flags & IPCZ_CONNECT_NODE_TO_BROKER) != 0;
537   const bool inherit_broker = (flags & IPCZ_CONNECT_NODE_INHERIT_BROKER) != 0;
538   const bool share_broker = (flags & IPCZ_CONNECT_NODE_SHARE_BROKER) != 0;
539   Ref<NodeLink> broker_link = node->GetBrokerLink();
540   if (share_broker && (from_broker || to_broker || inherit_broker)) {
541     return IPCZ_RESULT_INVALID_ARGUMENT;
542   }
543   if ((to_broker || from_broker) && (inherit_broker || share_broker)) {
544     return IPCZ_RESULT_INVALID_ARGUMENT;
545   }
546 
547   auto [connector, result] = CreateConnector(
548       std::move(node), std::move(transport), flags, initial_portals,
549       std::move(broker_link), std::move(callback));
550   if (result != IPCZ_RESULT_OK) {
551     return result;
552   }
553 
554   if (!share_broker && !connector->ActivateTransport()) {
555     // Note that when referring another node to our own broker, we don't
556     // activate the transport, since the transport will be passed to the broker.
557     // See NodeConnectorForReferrer.
558     return IPCZ_RESULT_UNKNOWN;
559   }
560 
561   if (!connector->Connect()) {
562     return IPCZ_RESULT_UNKNOWN;
563   }
564 
565   return IPCZ_RESULT_OK;
566 }

这里inherit_broker表示继承broker,也代表该节点非中心链接。
函数先创建Connector,然后激活传输点,最后请求建立链接,这个函数我们分析好几遍了,直接先Connector的创建。

471 std::pair<Ref<NodeConnector>, IpczResult> CreateConnector(
472     Ref<Node> node,
473     Ref<DriverTransport> transport,
474     IpczConnectNodeFlags flags,
475     const std::vector<Ref<Portal>>& initial_portals,
476     Ref<NodeLink> broker_link,
477     NodeConnector::ConnectCallback callback) {
478   const bool from_broker = node->type() == Node::Type::kBroker;
479   const bool to_broker = (flags & IPCZ_CONNECT_NODE_TO_BROKER) != 0;
480   const bool share_broker = (flags & IPCZ_CONNECT_NODE_SHARE_BROKER) != 0;
481   const bool inherit_broker = (flags & IPCZ_CONNECT_NODE_INHERIT_BROKER) != 0;
      ......
516   if (inherit_broker) {
517     return {MakeRefCounted<NodeConnectorForReferredNonBroker>(
518                 std::move(node), std::move(transport), flags, initial_portals,
519                 std::move(callback)),
520             IPCZ_RESULT_OK};
521   }
522 
523   return {nullptr, IPCZ_RESULT_INVALID_ARGUMENT};
524 }
525 
526 }  // namespace
527 

这里创建的Connector为NodeConnectorForReferredNonBroker。我们看下它是如何创建的

203   NodeConnectorForReferredNonBroker(Ref<Node> node,
204                                     Ref<DriverTransport> transport,
205                                     IpczConnectNodeFlags flags,
206                                     std::vector<Ref<Portal>> waiting_portals,
207                                     ConnectCallback callback)
208       : NodeConnector(std::move(node),
209                       std::move(transport),
210                       flags,
211                       std::move(waiting_portals),
212                       std::move(callback)) {}

比较简单, 传输点的激活我们在chromium通信系统-ipcz系统(五)-ipcz系统代码实现-信道和共享内存一文已经分析过了。接下来看一下请求链接的过程

216   // NodeConnector:
217   bool Connect() override {
218     ABSL_ASSERT(node_->type() == Node::Type::kNormal);
219     msg::ConnectToReferredBroker connect;
220     connect.params().protocol_version = msg::kProtocolVersion;
221     connect.params().num_initial_portals =
222         checked_cast<uint32_t>(num_portals());
223     return IPCZ_RESULT_OK == transport_->Transmit(connect);
224   }

创建了一个ConnectToReferredBroker对象,这个对象非常简单。只有要初始化的端口数一个参数。 由于此时B进程并没有激活传输点,消息会被A进程读取。我们来看A进程如何处理。

回到A进程, NodeConnectorForBrokerReferral会收到对应消息。

314   // NodeMessageListener overrides:
315   bool OnConnectToReferredBroker(
316       msg::ConnectToReferredBroker& connect_to_broker) override {
317     DVLOG(4) << "Accepting ConnectToReferredBroker on broker "
318              << broker_name_.ToString() << " from new referred node "
319              << referred_node_name_.ToString();
320 
321     // Ensure this NodeConnector stays alive until this method returns.
322     // Otherwise the last reference may be dropped when the new NodeLink below
323     // takes over listening on `transport_`.
324     Ref<NodeConnector> self(this);
325 
326     // First, accept the new non-broker client on this broker node. There are
327     // no initial portals on this link, as this link was not established
328     // directly by the application. Note that this takes over listsening on
329     // `transport_`
330     const uint32_t protocol_version = std::min(
331         connect_to_broker.params().protocol_version, msg::kProtocolVersion);
        // 建立A->C的NodeLink
332     Ref<NodeLink> link_to_referree = NodeLink::CreateActive(
333         node_, LinkSide::kA, broker_name_, referred_node_name_,
334         Node::Type::kNormal, protocol_version, transport_,
335         NodeLinkMemory::Create(node_, std::move(link_memory_.mapping)));
336     AcceptConnection({.link = link_to_referree}, /*num_remote_portals=*/0);
337 
338     // Now we can create a new link to introduce both clients -- the referrer
339     // and the referree -- to each other.
        // 创建一对传输点
340     auto [referrer, referree] = DriverTransport::CreatePair(
341         node_->driver(), referrer_->transport().get(), transport_.get());
342 
343     // Give the referred node a reply with sufficient details for it to
344     // establish links to both this broker and the referrer simultaneously.
345     //
346     // SUBTLE: It's important that this message is sent before the message to
347     // the referrer below. Otherwise the referrer might begin relaying messages
348     // through the broker to the referree before this handshake is sent to the
349     // referree, which would be bad.
350     msg::ConnectToReferredNonBroker connect;
351     connect.params().name = referred_node_name_;
352     connect.params().broker_name = broker_name_;
353     connect.params().referrer_name = referrer_->remote_node_name();
354     connect.params().broker_protocol_version = protocol_version;
355     connect.params().referrer_protocol_version =
356         referrer_->remote_protocol_version();
357     connect.params().num_initial_portals = num_initial_portals_;
358     connect.params().broker_link_buffer =
359         connect.AppendDriverObject(link_memory_.memory.TakeDriverObject());
360     connect.params().referrer_link_transport =
361         connect.AppendDriverObject(referree->TakeDriverObject());
362     connect.params().referrer_link_buffer = connect.AppendDriverObject(
363         client_link_memory_.memory.Clone().TakeDriverObject());
        // 向C发送ConnectToReferredNonBroker 消息
364     link_to_referree->Transmit(connect);
365 
366     // Finally, give the referrer a repy which includes details of its new link
367     // to the referred node.
368     msg::NonBrokerReferralAccepted accepted;
369     accepted.params().referral_id = referral_id_;
370     accepted.params().protocol_version =
371         connect_to_broker.params().protocol_version;
372     accepted.params().num_initial_portals =
373         connect_to_broker.params().num_initial_portals;
374     accepted.params().name = referred_node_name_;
375     accepted.params().transport =
376         accepted.AppendDriverObject(referrer->TakeDriverObject());
377     accepted.params().buffer = accepted.AppendDriverObject(
378         client_link_memory_.memory.TakeDriverObject());
        // 向B发送NonBrokerReferralAccepted 消息
379     referrer_->Transmit(accepted);
380     return true;
381   }

函数332-336 建立A->C的NodeLink。340-341行创建一对传输点(这对传输点可以互相通信)
350行-364行向C发送一个ConnectToReferredNonBroker消息
368-379行向B发送一个NonBrokerReferralAccepted 消息。我们先来看一下ConnectToReferredNonBroker包含的参数。

ConnectToReferredNonBroker:

  • name: 为C分配的NodeName
  • broker_name_: A进程为broker进程,它对应的名字
  • referrer_name: B进程Node的名字
  • broker_protocol_version: broker协议版本
  • referrer_protocol_version:B的协议版本
  • num_initial_portals 初始化端口数量
  • broker_link_buffer 和broker的共享内存
  • referrer_link_transport:创建的一对传输点的一端
  • referrer_link_buffer 和B的共享内存

再来看一下NonBrokerReferralAccepted的数据过程

  • referral_id:B的请求id
  • protocol_version 协议版本
  • num_initial_portals 初始化端口数
  • referred_node_name_ C NodeName
  • transport:创建的一对传输点的另一端
  • buffer:共享内存

知道的消息体的结构,我们大概可以知道A进程帮助B、C进程创建了一个通信管道,这就是broker的作用,我们来具体看B、C进程如何处理消息。 先看C进程
third_party/ipcz/src/ipcz/node_connector.cc
NodeConnectorForReferredNonBroker

227   bool OnConnectToReferredNonBroker(
228       msg::ConnectToReferredNonBroker& connect) override {
        ......
234     DriverMemoryMapping broker_mapping =
235         DriverMemory(
236             connect.TakeDriverObject(connect.params().broker_link_buffer))
237             .Map();
238     DriverMemoryMapping referrer_mapping =
239         DriverMemory(
240             connect.TakeDriverObject(connect.params().referrer_link_buffer))
241             .Map();
242     Ref<DriverTransport> referrer_transport = MakeRefCounted<DriverTransport>(
243         connect.TakeDriverObject(connect.params().referrer_link_transport));
244     if (!broker_mapping.is_valid() || !referrer_mapping.is_valid() ||
245         !referrer_transport->driver_object().is_valid()) {
246       return false;
247     }
248 
249     // Ensure this NodeConnector stays alive until this method returns.
250     // Otherwise the last reference may be dropped when the new NodeLink takes
251     // over listening on `transport_`.
252     Ref<NodeConnector> self(this);
253     const uint32_t broker_protocol_version = std::min(
254         connect.params().broker_protocol_version, msg::kProtocolVersion);
255     auto broker_link = NodeLink::CreateActive(
256         node_, LinkSide::kB, connect.params().name,
257         connect.params().broker_name, Node::Type::kBroker,
258         broker_protocol_version, transport_,
259         NodeLinkMemory::Create(node_, std::move(broker_mapping)));
260     if ((flags_ & IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE) != 0) {
261       node_->SetAllocationDelegate(broker_link);
262     }
263     node_->AddConnection(connect.params().broker_name,
264                          {
265                              .link = broker_link,
266                              .broker = broker_link,
267                          });
268 
269     const uint32_t referrer_protocol_version = std::min(
270         connect.params().referrer_protocol_version, msg::kProtocolVersion);
271     auto referrer_link = NodeLink::CreateInactive(
272         node_, LinkSide::kB, connect.params().name,
273         connect.params().referrer_name, Node::Type::kNormal,
274         referrer_protocol_version, std::move(referrer_transport),
275         NodeLinkMemory::Create(node_, std::move(referrer_mapping)));
276 
277     AcceptConnection({.link = referrer_link, .broker = broker_link},
278                      connect.params().num_initial_portals);
279     referrer_link->Activate();
280     return true;
281   }
	

252-267行创建了和broker(A进程)的NodeLink。
269-278行 创建了C和B的NodeLink,并初始化端口,然后激活链接。

我们再开看B进程如何处理NonBrokerReferralAccepted消息
third_party/ipcz/src/ipcz/node_link.cc

398 bool NodeLink::OnNonBrokerReferralAccepted(
 399     msg::NonBrokerReferralAccepted& accepted) {
 400   if (remote_node_type_ != Node::Type::kBroker) {
 401     return false;
 402   }
 403 
 404   ReferralCallback callback;
 405   {
 406     absl::MutexLock lock(&mutex_);
 407     auto it = pending_referrals_.find(accepted.params().referral_id);
 408     if (it == pending_referrals_.end()) {
 409       return false;
 410     }
 411     callback = std::move(it->second);
 412     pending_referrals_.erase(it);
 413   }
 414 
 415   const uint32_t protocol_version =
 416       std::min(msg::kProtocolVersion, accepted.params().protocol_version);
 417   auto transport = MakeRefCounted<DriverTransport>(
 418       accepted.TakeDriverObject(accepted.params().transport));
 419   DriverMemoryMapping mapping =
 420       DriverMemory(accepted.TakeDriverObject(accepted.params().buffer)).Map();
 421   if (!transport->driver_object().is_valid() || !mapping.is_valid()) {
 422     // Not quite a validation failure if the broker simply failed to allocate
 423     // resources for this link. Treat it like a connection failure.
 424     callback(/*link=*/nullptr, /*num_initial_portals=*/0);
 425     return true;
 426   }
 427 
 428   Ref<NodeLink> link_to_referree = NodeLink::CreateInactive(
 429       node_, LinkSide::kA, local_node_name_, accepted.params().name,
 430       Node::Type::kNormal, protocol_version, std::move(transport),
 431       NodeLinkMemory::Create(node_, std::move(mapping)));
 432   callback(link_to_referree, accepted.params().num_initial_portals);
 433   link_to_referree->Activate();
 434   return true;
 435 }

创建了B->C的NodeLink,并激活。这个过程中还会回调callback。回顾一下回调函数

166   // NodeConnector:
167   bool Connect() override {
168     ABSL_ASSERT(node_->type() == Node::Type::kNormal);
169     if (!broker_link_) {
170       // If there's no broker link yet, wait for one.
171       node_->WaitForBrokerLinkAsync(
172           [connector = WrapRefCounted(this)](Ref<NodeLink> broker_link) {
173             connector->broker_link_ = std::move(broker_link);
174             connector->Connect();
175           });
176       return true;
177     }
178 
179     broker_link_->ReferNonBroker(
180         std::move(transport_for_broker_), checked_cast<uint32_t>(num_portals()),
181         [connector = WrapRefCounted(this), broker = broker_link_](
182             Ref<NodeLink> link_to_referred_node,
183             uint32_t remote_num_initial_portals) {
184           if (link_to_referred_node) {
185             connector->AcceptConnection(
186                 {.link = link_to_referred_node, .broker = broker},
187                 remote_num_initial_portals);
188           } else {
189             connector->RejectConnection();
190           }
191         });
192     return true;
193   }

181-191行,调用AcceptConnection 接受链接。

到这里我们大概理解了NonBroker和NonBroker通信的流程,是借助broker实现的,broker帮忙建立Transport,并且分配共享内存。

到这里NonBroker 和NonBroker通信我们就分析完了。

文章来源:https://blog.csdn.net/woai110120130/article/details/135282349
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。