我们 GreptimeDB?把 OpenDAL?作为统一的数据访问层。前段时间同事告诉我:数据库执行 `Copy From` 语句从 S3 导入一个 800 KiB 的 Parquet 文件需要 10s;经过一些调查,又研读了相关 `Reader` 的文档和具体实现后(暴露了之前没有 RTFSC 🥲);目前我们用一个快速修复把导入时间优化到了 1s 内,在后续会为上游实现 `BufferReader`?并做进一步的优化。谨以本文做一个记录和简单的总结。
本文涉及的 OpenDAL 源码 Commit:6980cd1
坦白说,我也是最近才理清楚 OpenDAL 的源码和其调用关系,之前都是一知半解。
我们所有的 IO 操作都是围绕着 `Operator`?展开的,先来看下 `Operator` 是怎么构建的。 以 `main.rs` 为例,首先我们创建了一个基于文件系统的 `Backend Builder`; 随后将其构建为 `accessor`(实现了 `Accessor trait`); 我们又将该 `accessor` 传入了 `OperatorBuilder::new`,最后调用了 `finish`。
OpenDAL 通过 `Accessor trait` 统一了不同存储后端(Backend)的行为,并向上层暴露统一的 IO 接口,例如 `create_dir`, `read`, `write` 等。
use opendal::services::Fs;
use opendal::Operator;
#[tokio::main]
async fn main() -> Result<()> {
? ? // Create fs backend builder.
? ? let mut builder = Fs::default();
? ? // Set the root for fs, all operations will happen under this root.
? ? //
? ? // NOTE: the root must be absolute path.
? ? builder.root("/tmp");
? ? let accessor = builder.build()?;
? ? let op: Operator = OperatorBuilder::new(accessor)?.finish();
? ? Ok(())
}
我们传入的 `accessor` 在调用 `new` 时,被追加了两层 `Layer`,并在调用 `finish` 时,又被追加了一层内部 `Layer`。追加 `Layer` 后,当我们调用 `Operator` 暴露出来的接口时,调用会从最外层 `CompleteLayer` 开始,并最终抵达最内层 `FsAccessor`。
?
FsAccessor
ErrorContextLayer
CompleteLayer
^
|
| Invoking (`read`, `reader_with`, `stat`...)
```
```rust
impl<A: Accessor> OperatorBuilder<A> {
? ? /// Create a new operator builder.
? ? #[allow(clippy::new_ret_no_self)]
? ? pub fn new(accessor: A) -> OperatorBuilder<impl Accessor> {
? ? ? ? // Make sure error context layer has been attached.
? ? ? ? OperatorBuilder { accessor }
? ? ? ? ? ? .layer(ErrorContextLayer)
? ? ? ? ? ? .layer(CompleteLayer)
? ? }
? ? ...
? ??
? ? /// Finish the building to construct an Operator.
? ? pub fn finish(self) -> Operator {
? ? ? ? let ob = self.layer(TypeEraseLayer);
? ? ? ? Operator::from_inner(Arc::new(ob.accessor) as FusedAccessor)
? ? }
}
TL;DR 说了半天其实想强调一下,代码应该从 CompleteLayer 开始读(顿悟
这里我们补充一些必要的上下文信息,以便理解后文内容。
目前,在查询场景,我们追加了一层 `LruCacheLayer`,那么我们 `Operator` 就如下图所示:
S3Accessor ? ? ? ? ? ? ? ?FsAccessor
ErrorContextLayer ? ? ? ? ErrorContextLayer
CompleteLayer ? ? ? ? ? ? CompleteLayer
? ? ^ ? ? ? ? ? ? ? ? ? ? ? ? ^ ?|
? ? | ? ? ? ? ? ? ? ? ? ? ? ? | ?|
? ? |`inner` ? ? ? ? ? `cache`| ?|
? ? | ? ? ? ? ? ? ? ? ? ? ? ? | ?|
? ? | ? ? ? ? ? ? ? ? ? ? ? ? | ?|
? ? | ? ? ? ? ? ? ? ? ? ? ? ? | ?|
? ? +----- LruCacheLayer -----+ ?|
? ? ? ? ? ? ? ? ?^ ? ? ? ? ? ? ? |
? ? ? ? ? ? ? ? ?| ? ? ? ? ? ? ? |
? ? ? ? ? ? ? ? ?| ? ? ? ? ? ? ? |
? ? ? ? ? ? ? ? ?| ? ? ? ? ? ? ? v
? ? ? ? ? ? ? ? ?| ? ? ? ? ? ? ? FileReader::new(oio::TokioReader<tokio::fs::File>)
? ? ? ? ? ? ? ? ?|
? ? ? ? ? ? ? ? ?Invoking(`reader`, `reader_with`)
以 `read` 接口为例,`LruCacheLayer` 会将 S3 的文件缓存到文件系统中, 并向上层返回缓存的基于文件系统的 `Box<dyn oio::Read>`(`FileReader::new(oio::TokioReader<tokio::fs::File>)`); 当然如果读取的文件不存在于缓存时,则先全量从 S3 加载文件至本地的文件系统中。
struct LruCacheLayer {
? inner: Operator, // S3Backend
? cache: Operator, // FsBackend
? index: CacheIndex
}
impl LayeredAccessor for LruCacheLayer {
? ...
? async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
? ? ? ? if self.index.hit(path, args) {
? ? ? ? ? // Returns `Box<dyn oio::Read>`
? ? ? ? ? self.cache.read(path, args).await?
? ? ? ? } else {
? ? ? ? ? // Fetches cache and stores...
? ? ? ? }
? }
? ...
}
在 Copy From 场景,我并没有加这一层 ?`LruCacheLayer`。那么我们 `Operator` 就如下图所示:
S3Accessor
ErrorContextLayer
CompleteLayer
? ?▲ ? ?│
? ?│ ? ?│
? ?│ ? ?│
? ?│ ? ?▼
? ?│ ? ?RangeReader::new(IncomingAsyncBody)
? ?│
? ?Invoking (`reader`, `reader_with`)
在 `Copy From` 中,我们拿到文件信息后,首先会调用 `operator.reader` 返回一个实现 `AsyncReader + AsyncSeek` 的 `reader`,再套一层 `BufReader`; 最终将该 `reader` 传入至 `ParquetRecordBatchStreamBuilder` 中。
这里面 `BufReader` 也是多此一举,`BufReader` 每一次 seek 后都会清空内部缓存区,所以其实没有获得任何性能上的收益。
? ...
? let reader = operator
? ? ? .reader(path)
? ? ? .await
? ? ? .context(error::ReadObjectSnafu { path })?;
? let buf_reader = BufReader::new(reader.compat());
? let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
? ? ? .await
? ? ? .context(error::ReadParquetSnafu)?;
? let upstream = builder
? ? ? .build()
? ? ? .context(error::BuildParquetRecordBatchStreamSnafu)?;
??
? ...
读取元信息逻辑如下,首先调用 `seek(SeekFrom::End(-FOOTER_SIZE_I64))` ,读取 `FOOTER_SIZE` 字节后解析出 `metadata_len`; 随后再一次调用 `seek`,并读取 `metadata_len` 字节后解析出元信息。
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
? ? fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
? ? ? ? const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
? ? ? ? async move {
? ? ? ? ? ? self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
? ? ? ? ? ? let mut buf = [0_u8; FOOTER_SIZE];
? ? ? ? ? ? self.read_exact(&mut buf).await?;
? ? ? ? ? ? let metadata_len = decode_footer(&buf)?;
? ? ? ? ? ? self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
? ? ? ? ? ? ? ? .await?;
? ? ? ? ? ? let mut buf = Vec::with_capacity(metadata_len);
? ? ? ? ? ? self.take(metadata_len as _).read_to_end(&mut buf).await?;
? ? ? ? ? ? Ok(Arc::new(decode_metadata(&buf)?))
? ? ? ? }
? ? ? ? .boxed()
? ? }
}
到上面为止,都是一些小问题。真正比较棘手的问题发生在这里,这里变量 `stream` 就是我们上面构建的 `ParquetRecordBatchStream`,当我们调用 `next` 时,`ParquetRecordBatchStream` 会调用多次 `reader` (`RangeReader`)的 `seek` 和 `read`。 然而每次调用 `seek` 都会重置 `RangeReader` 的内部状态(丢弃掉之前的字节流),并在下次调用 `read` 时,重新发起一个远程请求(后端为 S3 的场景)。(相关请参考 issue?和?讨论)
`ParquetRecordBatchStream` 在取回每列数据时:会先调用 `RangeReader seek` ,随后调用 `read` 读取一些字节。那么总共需要发起的远程调用次数为 `RowGroup` 数乘上 `RowGroup` 内列的数。 我们 800KiB 包含了 50 个 `RowGroup` 和 12 列,也就是发起了 600 次 S3 get 请求!
? ? pub async fn copy_table_from(
? ? ...
? ? ? ? ? ? while let Some(r) = stream.next().await {
? ? ? ? ? ? ? ? let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
? ? ? ? ? ? ? ? let vectors =
? ? ? ? ? ? ? ? ? ? Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
? ? ? ? ? ? ? ? pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
? ? ? ? ? ? ? ? let columns_values = fields
? ? ? ? ? ? ? ? ? ? .iter()
? ? ? ? ? ? ? ? ? ? .cloned()
? ? ? ? ? ? ? ? ? ? .zip(vectors)
? ? ? ? ? ? ? ? ? ? .collect::<HashMap<_, _>>();
? ? ? ? ? ? ? ? pending.push(self.inserter.handle_table_insert(
? ? ? ? ? ? ? ? ? ? InsertRequest {
? ? ? ? ? ? ? ? ? ? ? ? catalog_name: req.catalog_name.to_string(),
? ? ? ? ? ? ? ? ? ? ? ? schema_name: req.schema_name.to_string(),
? ? ? ? ? ? ? ? ? ? ? ? table_name: req.table_name.to_string(),
? ? ? ? ? ? ? ? ? ? ? ? columns_values,
? ? ? ? ? ? ? ? ? ? },
? ? ? ? ? ? ? ? ? ? query_ctx.clone(),
? ? ? ? ? ? ? ? ));
? ? ? ? ? ? ? ? if pending_mem_size as u64 >= pending_mem_threshold {
? ? ? ? ? ? ? ? ? ? rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ??
? ? ...
`RangeReader` 其 `self.state` 初始值为 `State::Idle`,首先我们假设 `self.offset` 为 `Some(0)`; 随后 `self.state` 被设置为 `State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)`, 并再次调用 `self.poll_read(cx, buf)`。
impl<A, R> oio::Read for RangeReader<A, R>
where
? ? A: Accessor<Reader = R>,
? ? R: oio::Read,
{
? ? fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
? ? ? ? ...
? ? ? ? match &mut self.state {
? ? ? ? ? ? State::Idle => {
? ? ? ? ? ? ? ? self.state = if self.offset.is_none() {
? ? ? ? ? ? ? ? ? ? // Offset is none means we are doing tailing reading.
? ? ? ? ? ? ? ? ? ? // we should stat first to get the correct offset.
? ? ? ? ? ? ? ? ? ? State::SendStat(self.stat_future())
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? State::SendRead(self.read_future())
? ? ? ? ? ? ? ? };
? ? ? ? ? ? ? ? self.poll_read(cx, buf)
? ? ? ? ? ? }
? ? ? ? ? ? ...
? ? ? ? }
? ? }
}
显而易见,`self.read_future()` 返回了一个 `BoxedFuture`;在 `BoxedFuture` 中调用底层的 `Accessor` 的 `read` 接口(`acc.read(&path, op).await`)。 `Accessor` 可以是 S3 的存储后端实现,也可以是 OSS 实现等;在我们场景中,这个 `Accessor` 是 S3 存储后端,那么当它的 `read` 接口被调用时,会建立取回文件的 TCP 连接,将来自 S3 的响应以字节流的形式返回给上层。
impl<A, R> RangeReader<A, R>
where
? ? A: Accessor<Reader = R>,
? ? R: oio::Read,
{
? ? fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> {
? ? ? ? let acc = self.acc.clone();
? ? ? ? let path = self.path.clone();
? ? ? ? let mut op = self.op.clone();
? ? ? ? // cur != 0 means we have read some data out, we should convert
? ? ? ? // the op into deterministic to avoid ETag changes.
? ? ? ? if self.cur != 0 {
? ? ? ? ? ? op = op.into_deterministic();
? ? ? ? }
? ? ? ? // Alter OpRead with correct calculated range.
? ? ? ? op = op.with_range(self.calculate_range());
? ? ? ? Box::pin(async move { acc.read(&path, op).await })
? ? }
? ? ...
}
到此为止,`poll_read` 还没有返回;在上文中 `self.poll_read()` 被再次调用,此时 `self.state` 为 `State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)`。 这里的 `ready!(Pin::new(fut).poll(cx))` 返回值就是上文中 `acc.read(&path, op).await` 调用的返回值。(对于 S3 存储后端,远程调用发生在这里) 随后内部状态 `self.state` 被设置为 `State::Read(r)`,并再次调用 `self.poll_read()`。 再次进入 `self.poll_read()` 时,`RangeReader` 内部状态被设置为 `State::Reader(R)`。 这里的 `R(r)` 便是读取请求响应的字节流,对于 S3 存储后端,`Pin::new(r).poll_read(cx, buf)` 将 TCP 缓冲区的字节数据写入到上层应用中。
impl<A, R> oio::Read for RangeReader<A, R>
where
? ? A: Accessor<Reader = R>,
? ? R: oio::Read,
{
? ? fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
? ? ? ? // Sanity check for normal cases.
? ? ? ? if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) {
? ? ? ? ? ? return Poll::Ready(Ok(0));
? ? ? ? }
? ? ? ? match &mut self.state {
? ? ? ? ? ? ...
? ? ? ? ? ? State::SendRead(fut) => {
? ? ? ? ? ? ? ? let (rp, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
? ? ? ? ? ? ? ? ? ? // If read future returns an error, we should reset
? ? ? ? ? ? ? ? ? ? // state to Idle so that we can retry it.
? ? ? ? ? ? ? ? ? ? self.state = State::Idle;
? ? ? ? ? ? ? ? ? ? err
? ? ? ? ? ? ? ? })?;
? ? ? ? ? ? ? ? // Set size if read returns size hint.
? ? ? ? ? ? ? ? if let Some(size) = rp.size() {
? ? ? ? ? ? ? ? ? ? if size != 0 && self.size.is_none() {
? ? ? ? ? ? ? ? ? ? ? ? self.size = Some(size + self.cur);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? self.state = State::Read(r);
? ? ? ? ? ? ? ? self.poll_read(cx, buf)
? ? ? ? ? ? }
? ? ? ? ? ? State::Read(r) => match ready!(Pin::new(r).poll_read(cx, buf)) {
? ? ? ? ? ? ? ? Ok(0) => {
? ? ? ? ? ? ? ? ? ? // Reset state to Idle after all data has been consumed.
? ? ? ? ? ? ? ? ? ? self.state = State::Idle;
? ? ? ? ? ? ? ? ? ? Poll::Ready(Ok(0))
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? Ok(n) => {
? ? ? ? ? ? ? ? ? ? self.cur += n as u64;
? ? ? ? ? ? ? ? ? ? Poll::Ready(Ok(n))
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? Err(e) => {
? ? ? ? ? ? ? ? ? ? self.state = State::Idle;
? ? ? ? ? ? ? ? ? ? Poll::Ready(Err(e))
? ? ? ? ? ? ? ? }
? ? ? ? ? ? },
? ? ? ? }
? ? }
}
还记得刚才我们 `RangeReader` 内部状态吗?没错,是 `State::Reader(R)`。 如果我们在 `read` 之后在调用 `seek`,`RangeReader` 内部的字节流会被丢弃,状态重新设置为 `State::Idle`。 也就是说,在每次 `seek` 调用后再次调用 `read`,`RangeReader` 便会请求底层 `Accessor` 的 `read` 接口 `(acc.read(&path, op).await)` 发起一个远程调用,返回一个包含 `[Pos, size)` 的 `Reader`;对于 S3 存储后端,调用这个接口的开销是非常昂贵的(TTFB 通常高达百毫秒)。
另外还有一个性能相关的重点,当我们尝试 `SeekFrom::End()`` 的时,且 `self.size` 未知时,会有一次额外的 `stat` 操作。 `self.poll_seek()` 调用后 `self.cur` 会被设置为 `base.checked_add(amt)`。