Flutter为什么不需要子线程——Dart IO源码剖析(上)

发布时间:2024年01月18日

Dart IO 源码剖析

许多Flutter新手,特别是安卓、iOS原生开发转做Flutter的小伙伴,一直对Flutter 单线程模型开发APP倍感不解,他们总是喜欢本能的把网络请求、文件读写放到一个单独线程去做,因为“耗时操作会阻塞UI线程嘛”。于是,我看到有人把这些所谓耗时代码放到一个单独的Isolate中去做,美其名曰优化性能,提升帧率,殊不知这是耗费资源,降低性能。因为Isolate是内存隔离的,它比操作系统线程要更重,与其说它是Dart的线程,不如说它更像进程。当你在两个Isolate之间通信时,涉及内存的拷贝,频繁的交互,反而降低Dart 主隔离(root isloate)的性能,这也是官方并不太推荐你在非计算密集型任务中创建子隔离的原因。

虽然大家都知道Flutter中不用创建单独的隔离去发起网络IO,但是并没有资料详细解释为什么不需要,今天我们就通过剖析Dart VM底层源码,详细了解Dart IO的底层原理。

关于Dart IO 源码剖析,我会用两篇文章来介绍,本章以剖析文件IO为主,下一篇我们剖析网络IO。发车了,请系好安全带!!!

文件IO

Dart 侧

在Dart中,我们一般可以使用下面的代码将二进制数据写入到一个文件中:

File("test.bin").writeAsBytes([96,97]);

接下来,我们就沿着这条调用链,详细研究一下,当在Dart 上层写文件时,Dart VM到底发生了什么。

abstract interface class File implements FileSystemEntity {
...
 
    factory File(String path) {
        final IOOverrides? overrides = IOOverrides.current;
        if (overrides == null) {
          return new _File(path);
        }
        return overrides.createFile(path);
   }
 
    ...
 
    Future<File> writeAsBytes(List<int> bytes,
      {FileMode mode = FileMode.write, bool flush = false});
}

由于File类是一个抽象接口,并没有writeAsBytes方法的具体实现,但是我们通过它的工厂构造方法可知,其具体实现的子类是_File,我们直接找到file_impl.dart文件,查看_File源码:

  Future<File> writeAsBytes(List<int> bytes,
      {FileMode mode = FileMode.write, bool flush = false}) {
    return open(mode: mode).then((file) {
      return file.writeFrom(bytes, 0, bytes.length).then<File>((_) {
        if (flush) return file.flush().then((_) => this);
        return this;
      }).whenComplete(file.close);
    });
  }

这里它又调用了内部的open函数,返回了一个file对象,并调用这个filewriteFrom()方法写入字节:

// 这里为了紧凑,删减部分代码
Future<RandomAccessFile> open({FileMode mode = FileMode.read}) {
    ...
    return _dispatchWithNamespace(
        _IOService.fileOpen, [null, _rawPath, mode._mode]).then((response) {
      _checkForErrorResponse(response, "Cannot open file", path);
      return _RandomAccessFile(response as int, path);
    });
}

通过返回值类型,我们知道file其实是一个_RandomAccessFile类的实例。注意,此类的源码也在file_impl.dart文件中,这里我们直接查看它的writeFrom实现:

// 删减部分代码
Future<RandomAccessFile> writeFrom(List<int> buffer,
      [int start = 0, int? end]) {
    ...
 
    List request = new List<dynamic>.filled(4, null);
    request[0] = null;
    request[1] = result.buffer;
    request[2] = result.start;
    request[3] = end - (start - result.start);
    return _dispatch(_IOService.fileWriteFrom, request).then((response) {
      _checkForErrorResponse(response, "writeFrom failed", path);
      _resourceInfo.addWrite(end! - (start - result.start));
      return this;
    });
}

此方法内部调用了一个_dispatch方法,我们通过方法名和参数名,大致可以猜测出此处应该是向所谓的_IOService派发了一条类型为_IOService.fileWriteFrom的请求消息,我们继续看一下这个方法的实现:

  Future<Object?> _dispatch(int request, List data, {bool markClosed = false}) {
    if (closed) {
      return new Future.error(new FileSystemException("File closed", path));
    }
    if (_asyncDispatched) {
      var msg = "An async operation is currently pending";
      return new Future.error(new FileSystemException(msg, path));
    }
    if (markClosed) {
      closed = true;
    }
    _asyncDispatched = true;
    data[0] = _pointer();
    return _IOService._dispatch(request, data).whenComplete(() {
      _asyncDispatched = false;
    });
  }

可以看到,此方法并没有太多处理,只是继续调用_IOService._dispatch静态方法派发请求消息。这里我们找到io_service.dart源文件,打开发现其并没有具体实现:

// 省略部分常量定义
class _IOService {
  ...
  static const int fileReadByte = 18;
  static const int fileWriteByte = 19;
  static const int fileRead = 20;
  static const int fileReadInto = 21;
  static const int fileWriteFrom = 22;
  ...
 
  external static Future<Object?> _dispatch(int request, List data);
}

看到这里,很多人可能就只能无奈放弃了,因为external修饰的方法一般是本地方法,也就是说该方法是由VM底层的C++来实现的。也许有人会去VM的C++源码中搜索,结果一无所获,因为C++代码中找不到名为_dispatch的函数或方法。

但是我看到此处,就发觉了不对劲,Dart层的方法也是必须与C++层的函数映射关联起来才能调用的,并不是简单的在方法名上面加个external修饰就大功告成的,否则Dart VM怎么知道你这个external函数到底对应哪个C++函数?如果写过Java的JNI代码,对此应该深有体会。另外,在Dart的2.14版本以前,是支持第三方开发者为Dart写本地扩展的,简单说就是写一个C++函数,然后映射到Dart层供人调用,这个机制与官方推荐的Dart FFI不同,但与Java JNI最为相似。我曾在dart 2.5版本上试验过本机扩展机制,里面在声明Dart 层的方法时,是需要明确指定映射到C++函数的名称的。Dart 2.15之后,本机扩展的官方文档被删除了,也就是不让第三方开发者使用此机制,但是Dart VM和Dart上层仍然是使用此机制交互的,学习Dart 本机扩展机制,是有利于我们剖析理解Dart VM底层的。这里我找到了官方文档的备份,想要探究本机扩展的,可以查看 独立 Dart VM 的本机扩展

现在回到我们的_dispatch方法,这里没有任何标记用于指定映射到C++的函数名,所以这行代码绝对是有问题的,不可能正确执行。我通过文件内容检索工具,检索了Dart SDK的全部代码,终于发现了其中的猫腻。

这里我找到了sdk\lib\_internal\vm\bin\io_service_patch.dart


class _IOService {
  static _IOServicePorts _servicePorts = new _IOServicePorts();
  static RawReceivePort? _receivePort;
  static late SendPort _replyToPort;
  static HashMap<int, Completer> _messageMap = new HashMap<int, Completer>();
  static int _id = 0;
 
  
  static Future<Object?> _dispatch(int request, List data) {
    int id;
    do {
      id = _getNextId();
    } while (_messageMap.containsKey(id));
    final SendPort servicePort = _servicePorts._getPort(id);
    _ensureInitialize();
    final Completer completer = new Completer();
    _messageMap[id] = completer;
    try {
      servicePort.send(<dynamic>[id, _replyToPort, request, data]);
    } catch (error) {
      _messageMap.remove(id)!.complete(error);
      if (_messageMap.length == 0) {
        _finalize();
      }
    }
    return completer.future;
  }
  // ... 删除部分代码
}

我们发现,其实真正的_IOService实现代码被Dart SDK给隐藏了,并且还带有一定的误导,_dispatch方法根本就不是一个本机方法,就是一个普通的Dart方法而已。这里官方是通过补丁的方式,在编译时,将所有的@patch修饰的代码与前面公开的_IOService类进行替换或合并。简单说,最终真正的_IOService是将上面的两个_IOService实现合并起来的完整代码。这里,我还简单研究了一下@patch注解,这个注解并不是一个简单的注解,换句话说,它不是用我们熟知的Dart的注解生成器去做的注解解析,它的实现非常复杂,代码是在Dart的编译前端那个包。也就是说,它并不是去做Dart源码级别的处理,而是在源码解析之后,直接修改的AST,相当于是修改了中间产物,和闲鱼的那个AspectD框架类似。

这里真不得不吐槽一下Dart 官方的坑爹!

继续我们今天的源码剖析,_dispatch中的实现,实际上不是直接去调用C++的本机扩展函数,它是获取了一个Native层面的端口,然后向这个端口发消息,这里的端口通信和Dart层的Isolate端口通信是一样的。看到此处,请思考一个问题,这里为什么要进行端口通信,而不是直接调用C++层的扩展函数?

很简单,理由和我们的Isolate通信一样!这里肯定是为了跨线程,看到这里_dispatch的实现,我们就应该知道,Dart VM层肯定是起了一个工作线程,Dart层的调用和VM层的实现不在同一个线程了。

接下来,我们注意到final SendPort servicePort = _servicePorts._getPort(id);这行代码,它是从_servicePorts中获取一个发送消息的端口,让我们看看这个类的具体实现:

class _IOServicePorts {
  static const int maxPorts = 32;
  final List<SendPort> _ports = [];
  final List<int> _useCounts = [];
  final List<int> _freePorts = [];
  final Map<int, int> _usedPorts = HashMap<int, int>();
 
  _IOServicePorts();
 
  SendPort _getPort(int forRequestId) {
    assert(!_usedPorts.containsKey(forRequestId));
    if (_freePorts.isEmpty && _ports.length < maxPorts) {
      final SendPort port = _newServicePort();
      _ports.add(port);
      _useCounts.add(0);
      _freePorts.add(_ports.length - 1);
    }
    final index = _freePorts.isNotEmpty
        ? _freePorts.removeLast()
        : forRequestId % maxPorts;
    _usedPorts[forRequestId] = index;
    _useCounts[index]++;
    return _ports[index];
  }
 
  void _returnPort(int forRequestId) {
    final index = _usedPorts.remove(forRequestId)!;
    if (--_useCounts[index] == 0) {
      _freePorts.add(index);
    }
  }
 
  ("vm:external-name", "IOService_NewServicePort")
  external static SendPort _newServicePort();
}

整体代码很少,逻辑也很清晰,主要就是调用一个本机扩展方法_newServicePort(),在C++层面创建了一个接收消息的服务端口,然后把这个端口的SendPort保存起来复用。这里的_newServicePort才是一个真正的external方法,它使用@pragma注解将Dart层的方法声明与底层的IOService_NewServicePort函数名关联起来。至此,我们才终于有了继续向底层探索的线索!

我们继续在虚拟机的C++源码目录sdk\runtime\中搜索IOService_NewServicePort,我们可能会在sdk\runtime\bin\io_natives.cc中找到一些基于宏的声明,这些声明的目的主要是自动生成符合Dart 本机扩展机制的C++代码,源码里面的这些宏定义,主要就是为了简少编写一些模版代码的工作量。

#define IO_NATIVE_LIST(V)
...
  V(InternetAddress_RawAddrToString, 1)                                        \
  V(IOService_NewServicePort, 0)                                               \
  V(Namespace_Create, 2)                                                       \
...

这里函数名后面的数值,是代表函数的参数个数。这个函数的真正实现,是在sdk\sdk\runtime\bin\io_service.cc中:

namespace dart {
namespace bin {
 
#define CASE_REQUEST(type, method, id)                                         \
  case IOService::k##type##method##Request:                                    \
    response = type::method##Request(data);                                    \
    break;
 
void IOServiceCallback(Dart_Port dest_port_id, Dart_CObject* message) {
  Dart_Port reply_port_id = ILLEGAL_PORT;
  CObject* response = CObject::IllegalArgumentError();
  CObjectArray request(message);
  if ((message->type == Dart_CObject_kArray) && (request.Length() == 4) &&
      request[0]->IsInt32() && request[1]->IsSendPort() &&
      request[2]->IsInt32() && request[3]->IsArray()) {
    CObjectInt32 message_id(request[0]);
    CObjectSendPort reply_port(request[1]);
    CObjectInt32 request_id(request[2]);
    CObjectArray data(request[3]);
    reply_port_id = reply_port.Value();
    switch (request_id.Value()) {
      IO_SERVICE_REQUEST_LIST(CASE_REQUEST);
      default:
        UNREACHABLE();
    }
  }
 
  CObjectArray result(CObject::NewArray(2));
  result.SetAt(0, request[0]);
  result.SetAt(1, response);
  ASSERT(reply_port_id != ILLEGAL_PORT);
  Dart_PostCObject(reply_port_id, result.AsApiCObject());
}
 
Dart_Port IOService::GetServicePort() {
  return Dart_NewNativePort("IOService", IOServiceCallback, true);
}
 
void FUNCTION_NAME(IOService_NewServicePort)(Dart_NativeArguments args) {
  Dart_SetReturnValue(args, Dart_Null());
  Dart_Port service_port = IOService::GetServicePort();
  if (service_port != ILLEGAL_PORT) {
    // Return a send port for the service port.
    Dart_Handle send_port = Dart_NewSendPort(service_port);
    Dart_SetReturnValue(args, send_port);
  }
}
 
}  // namespace bin
}  // namespace dart

可以看到整个io_service.cc中的代码并不多,逻辑也不难理解。IOService_NewServicePort函数首先调用IOService::GetServicePort()创建了一个本地端口,而GetServicePort()又调用了Dart_NewNativePort函数,这里的Dart_NewNativePort是Dart VM公开给第三方的虚拟机API,我们可以直接查看它在dart_native_api.h中的文档注释了解含义。注意,这里Dart_前缀的函数,都是VM公开的API,接着它又调用Dart_NewSendPort为这个本地端口创建了一个发送端口句柄,然后将发送端口句柄作为返回值进行了返回。我们看到IOService_NewServicePort函数似乎没有返回值,但是请注意,这里的返回值是对应上层的Dart函数声明的,我们再看一眼Dart端的函数声明:

  ("vm:external-name", "IOService_NewServicePort")
  external static SendPort _newServicePort();

所以,当调用_newServicePort()完之后,Dart 层就可以获得一个用于向底层发送消息的发送端口句柄。

Dart 侧的梳理

到这里,我们再来回顾梳理一下流程:

  1. Dart 层的File类是一个接口,具体实现是一个私有的_File子类

  2. _File子类也没有真正处理,它是对用起来更加繁琐的RandomAccessFile的简化封装

  3. RandomAccessFile也是一个接口,它的具体实现在私有的_RandomAccessFile子类中

  4. _RandomAccessFile类也不是最终目的地,它是通过调用_IOService._dispatch静态方法向虚拟机底层发消息的方式与VM中的C++方法进行交互

  5. 不同的消息类型,就代表了不同的IO操作:

    class _IOService {
      ...
      static const int fileReadByte = 18;
      static const int fileWriteByte = 19;
      static const int fileRead = 20;
      static const int fileReadInto = 21;
      static const int fileWriteFrom = 22;
      ...
     
      external static Future<Object?> _dispatch(int request, List data);
    }
    

总结,Dart 文件IO操作的真正实现是在VM的C++函数中。

C++ 侧

接下来,就只有一个关键问题需要搞明白了,那就是C++层是怎么接收并处理消息的?

这里我们再回看一个细节:

Dart_Port IOService::GetServicePort() {
  return Dart_NewNativePort("IOService", IOServiceCallback, true);
}

当使用Dart_NewNativePort创建一个本地端口时,它还注册了一个回调函数IOServiceCallback,我们仔细观察这个回调函数就会发现,它实际上就是Dart上层发来的消息处理器:

#define CASE_REQUEST(type, method, id)                                         \
  case IOService::k##type##method##Request:                                    \
    response = type::method##Request(data);                                    \
    break;
 
void IOServiceCallback(Dart_Port dest_port_id, Dart_CObject* message) {
  Dart_Port reply_port_id = ILLEGAL_PORT;
  CObject* response = CObject::IllegalArgumentError();
  CObjectArray request(message);
  if ((message->type == Dart_CObject_kArray) && (request.Length() == 4) &&
      request[0]->IsInt32() && request[1]->IsSendPort() &&
      request[2]->IsInt32() && request[3]->IsArray()) {
    CObjectInt32 message_id(request[0]);
    CObjectSendPort reply_port(request[1]);
    CObjectInt32 request_id(request[2]);
    CObjectArray data(request[3]);
    reply_port_id = reply_port.Value();
    switch (request_id.Value()) {
      IO_SERVICE_REQUEST_LIST(CASE_REQUEST);
      default:
        UNREACHABLE();
    }
  }
  CObjectArray result(CObject::NewArray(2));
  result.SetAt(0, request[0]);
  result.SetAt(1, response);
  ASSERT(reply_port_id != ILLEGAL_PORT);
  Dart_PostCObject(reply_port_id, result.AsApiCObject());
}

这个函数的参数是不是与external static Future<Object?> _dispatch(int request, List data)方法很相似?它前面的代码很好理解,其实就是对参数的提取和转换,最后得到的request_id就是消息类型,然后通过switch选择执行对应的函数。这里的IO_SERVICE_REQUEST_LIST()宏定义在sdk\runtime\bin\io_service.h文件中:

// This list must be kept in sync with the list in sdk/lib/io/io_service.dart
#define IO_SERVICE_REQUEST_LIST(V)                                             \
  ...
  V(File, ReadByte, 18)                                                        \
  V(File, WriteByte, 19)                                                       \
  V(File, Read, 20)                                                            \
  V(File, ReadInto, 21)                                                        \
  V(File, WriteFrom, 22)                                                       \
  ...
  V(SSLFilter, ProcessFilter, 43)
 
#define DECLARE_REQUEST(type, method, id) k##type##method##Request = id,

这里我们其实可以根据CASE_REQUEST宏把实际调用的C++函数名拼接出来。我们最开始调用的writeAsBytes()方法对应的消息类型是 static const int fileWriteFrom = 22,消息类型的值是22,这里正好对应宏定义中的V(File, WriteFrom, 22)。注意了,这里括号中的数值就不是参数个数了,而是赋值。

再根据CASE_REQUEST宏中response = type::method##Request(data);,我们拼出来的函数名应该是File_WriteFrom。如果你对C/C++中的宏不了解,你完全可以把它理解成纯粹的字符串替换,##号就是一个粘连符号。我继续在C++源码中搜索File_WriteFrom()函数的具体实现sdk\runtime\bin\file.cc

// 删除部分代码,精简结构
void FUNCTION_NAME(File_WriteFrom)(Dart_NativeArguments args) {
  File* file = GetFile(args);
  ...
  Dart_Handle buffer_obj = Dart_GetNativeArgument(args, 1);
  intptr_t start = DartUtils::GetNativeIntptrArgument(args, 2);
  intptr_t end = DartUtils::GetNativeIntptrArgument(args, 3);
  Dart_TypedData_Type type;
  intptr_t length = end - start;
  intptr_t buffer_len = 0;
  void* buffer = NULL;
  Dart_Handle result =
      Dart_TypedDataAcquireData(buffer_obj, &type, &buffer, &buffer_len);
  ...
  char* byte_buffer = reinterpret_cast<char*>(buffer);
  bool success = file->WriteFully(byte_buffer + start, length);
 
  if (!success) {
    Dart_SetReturnValue(args, DartUtils::NewDartOSError(&os_error));
  } else {
    Dart_SetReturnValue(args, Dart_Null());
  }
}

可以看到,文件的写入,其实就是调用的C++的File类进行操作的。也就是说,Dart层的所谓文件操作,其实就是把数据发送给C++函数,让C++干活。

看到此处,大家可能要质疑了,你前面不是说Dart的文件操作是在一个子线程进行的吗,所以才需要搞端口通信,但是这里没有看到线程呀?确实,这个地方是有一些绕的,并不是非常的直接,关键问题在于IOServiceCallback()这个回调是谁调用的?是在哪里调用的?

我们先找到Dart_NewNativePort函数的具体实现sdk\runtime\vm\native_api_impl.cc

DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
                                         Dart_NativeMessageHandler handler,
                                         bool handle_concurrently) {
  if (name == NULL) {
    name = "<UnnamedNativePort>";
  }
  if (handler == NULL) {
    OS::PrintErr("%s expects argument 'handler' to be non-null.\n",
                 CURRENT_FUNC);
    return ILLEGAL_PORT;
  }
  if (!Dart::SetActiveApiCall()) {
    return ILLEGAL_PORT;
  }
  IsolateLeaveScope saver(Isolate::Current());
  // 核心代码
  NativeMessageHandler* nmh = new NativeMessageHandler(name, handler);
  Dart_Port port_id = PortMap::CreatePort(nmh);
  if (port_id != ILLEGAL_PORT) {
    PortMap::SetPortState(port_id, PortMap::kLivePort);
    if (!nmh->Run(Dart::thread_pool(), NULL, NULL, 0)) {
      PortMap::ClosePort(port_id);
      port_id = ILLEGAL_PORT;
    }
  }
  Dart::ResetActiveApiCall();
  return port_id;
}

这里的核心代码,就是创建了一个NativeMessageHandler对象,然后调用了它的Run()方法。这里的NativeMessageHandler还持有了我们前面注册的IOServiceCallback回调。我们来看一下该类的声明,其完整的源码内容也很少 。

sdk\runtime\vm\native_message_handler.h

// NativeMessageHandler 接收消息并将它们分派给本机 C 处理程序
class NativeMessageHandler : public MessageHandler {
public:
  NativeMessageHandler(const char* name, Dart_NativeMessageHandler func);
  ~NativeMessageHandler();
 
  const char* name() const { return name_; }
  Dart_NativeMessageHandler func() const { return func_; }
 
  // ... 省略部分代码
 
private:
  char* name_;
  Dart_NativeMessageHandler func_;
};

该类继承自MessageHandler,所以真正的Run方法是在其父类中实现的。我们找到sdk\runtime\vm\message_handler.cc

bool MessageHandler::Run(ThreadPool* pool,
                         StartCallback start_callback,
                         EndCallback end_callback,
                         CallbackData data) {
  MonitorLocker ml(&monitor_);
  if (FLAG_trace_isolates) {
    OS::PrintErr(
        "[+] Starting message handler:\n"
        "\thandler:    %s\n",
        name());
  }
  ASSERT(pool_ == NULL);
  ASSERT(!delete_me_);
  pool_ = pool;
  start_callback_ = start_callback;
  end_callback_ = end_callback;
  callback_data_ = data;
  task_running_ = true;
  bool result = pool_->Run<MessageHandlerTask>(this);
  if (!result) {
    pool_ = nullptr;
    start_callback_ = nullptr;
    end_callback_ = nullptr;
    callback_data_ = 0;
    task_running_ = false;
  }
  return result;
}

这里关键的代码只有一行bool result = pool_->Run<MessageHandlerTask>(this),调用线程池来执行一个任务。关于Dart VM线程池的剖析,可以看我的另一篇剖析文章。这里的线程池对象,是Dart VM在初始化时创建的一个全局线程池Dart::thread_pool(),那么这里的线程池是在哪里创建的呢?同样,在上一篇线程池剖析的文章已经说明,请移步阅读 Dart VM 线程池剖析

继续我们的主题,线程池的Run函数是一个模版函数:

  template <typename T, typename... Args>
  bool Run(Args&&... args) {
    return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
  }

这里的T我们理解成Dart的泛型即可,那么这里的new T(std::forward<Args>(args)...)其实就是new MessageHandlerTask(),封装一个任务然后把参数透传进去,所以接下来要看看MessageHandlerTask的声明以及构造方法:

sdk\runtime\vm\message_handler.cc

class MessageHandlerTask : public ThreadPool::Task {
public:
  explicit MessageHandlerTask(MessageHandler* handler) : handler_(handler) {
    ASSERT(handler != NULL);
  }
 
  virtual void Run() {
    ASSERT(handler_ != NULL);
    handler_->TaskCallback();
  }
 
private:
  MessageHandler* handler_;
 
  DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
};

可见,MessageHandlerTask继承自线程池的ThreadPool::Task,在上篇关于线程池的剖析文章中我们知道,当一个任务Task对象被线程池调度执行时,其实就是调用TaskRun方法,所以这里的MessageHandlerTask任务被执行时,其Run方法被工作线程执行。

那么这里的handler是什么呢?其实就是前面调用pool_->Run<MessageHandlerTask>(this);传进去的this指针,也就是NativeMessageHandler类实例的指针。而NativeMessageHandler类没有实现TaskCallback()方法,这里其实是调用的父类实现,最后我们来看看MessageHandler中该方法的具体实现:

void MessageHandler::TaskCallback() {
   ...
 
      // Handle any pending messages for this message handler.
      if (status != kShutdown) {
        status = HandleMessages(&ml, (status == kOK), true);
      }
    }
 
    ...
}
 
MessageHandler::MessageStatus MessageHandler::HandleMessages(
    MonitorLocker* ml,
    bool allow_normal_messages,
    bool allow_multiple_normal_messages) {
  ...
  Message::Priority min_priority =
      ((allow_normal_messages && !paused()) ? Message::kNormalPriority
                                            : Message::kOOBPriority);
  std::unique_ptr<Message> message = DequeueMessage(min_priority);
  while (message != nullptr) {
    ...
    {
      DisableIdleTimerScope disable_idle_timer(idle_time_handler);
      status = HandleMessage(std::move(message));
    }
    ...
}

以上方法省略大量代码,只保留关键代码。首先是在TaskCallback()中调用了本类的HandleMessages()方法,在HandleMessages()中,又调用了一个虚函数HandleMessage()。注意,这两个方法一个带有s结尾,一个没有:

  virtual MessageStatus HandleMessage(std::unique_ptr<Message> message) = 0;

既然是虚函数,那么肯定是交给子类去实现的,这里我们到子类NativeMessageHandler中找实现:

MessageHandler::MessageStatus NativeMessageHandler::HandleMessage(
    std::unique_ptr<Message> message) {
  if (message->IsOOB()) {
    UNREACHABLE();
  }
  ApiNativeScope scope;
  Dart_CObject* object = ReadApiMessage(scope.zone(), message.get());
  (*func())(message->dest_port(), object);
  return kOK;
}

到这里,我们终于找到了(*func())(message->dest_port(), object);这行代码,还记得func是什么吗?它就是我们通过Dart_NewNativePort注册的回调函数的指针,这里就是真正的调用IOServiceCallback回调的地方。这里传的参数也与IOServiceCallback回调的完全一致。

C++ 侧的梳理

简单回顾梳理一下C++ 端的流程:

  1. 响应Dart层的_newServicePort()方法,C++侧对应的函数是IOService_NewServicePort()
  2. 调用Dart_NewNativePort函数创建本地端口
  3. 在创建本地端口的同时还创建了一个NativeMessageHandler对象,并传入了一个处理消息的回调函数IOServiceCallback
  4. 调用NativeMessageHandlerRun方法,将消息处理封装成了一个线程池的任务
  5. 在工作线程中执行TaskCallback()函数
  6. 通过一些封装的调用,最终执行处理消息的回调函数IOServiceCallback()

至此,我们彻底搞明白了Dart 文件IO的底层细节,明确了Dart的文件操作都是在C++的工作线程中完成的,当工作线程执行完了对应的文件操作,就会向Dart的单线程模型返回结果。这就说明,在Dart层面做应用开发,是不需要担心文件操作耗时会阻塞Dart的主线程的,因为虚拟机底层已经帮你开辟了子线程。

总结

画一个示意图做总结:


关注公众号:编程之路从0到1

文章来源:https://blog.csdn.net/yingshukun/article/details/135673655
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。