Iceberg:浅析基于Snapshot的事务过程

发布时间:2023年12月24日

MVCC事务(乐观锁)

我们知道,Iceberg基于Snapshot(快照机制)实现了乐观无锁地数据并发读写能力(MVCC,Multi Versions Concurrency Controll),默认提供了快照级别的事务隔离,因此可以至少避免脏读的问题。

SQL-92 标准定义了如下 4 种隔离级别:

  1. 读未提交:会存在脏读、幻读、不可重复读等问题。
  2. 读已提交:会存在幻读、不可重复读等问题。
  3. 可重复读:存在幻读的问题等问题。
  4. 串行事务:最高级别的隔离,可以避免一切由于事务并发导致的问题。

幻读,是对于并发事务的INSERT/DELETE操作,会导致连续读取相同区间的数据记录时,数据条数不同。
不可重复读,是对于并发事务的UPDATE操作,会导致连续读取相同数据记录的数据发生了变化。

并发写

基于快照实现方案是解决事务并的方式之一,可以达到可重复读、串行,但会导致另外一个数据更新丢失问题,即写偏斜问题

事务A开始,读取字段X,更新字段Y,事务结束。
事务B开始,读取字段Y,更新字段X,事务结束。

上面两个并发事务A/B,会基于同一个快照读取分别读取了X、Y的值,同时分别更新了Y、X,完成自己的事务,由于这两个事务在写的时候不会产生数据更新的重叠问题,因此都可以成功,产生的结果就是X、Y的值被互换了
但如果事务是串行的话,不论是A事务先执行还是B事务,最终的结果则是X的值=Y的值

并发读

实际上,基于快照的读又分为两种情况,产生的结果也不相同:
基于旧快照: 由于旧的快照数据不可能再发生变化,因此可以避免一切并发导致的读问题。
基于当前快照: 由于最新快照会被写事务更新,因此不可避免地会导致幻读、不可重复读的问题。

Iceberg中的实现

事务测试

下面的测试演示了一个事务上的多次更新,每一次从对象创建的快照更新对象的commit,都会产生一个新的事务ID,但这些新产生的Snapshot都归属于创建事务时绑定的起始TableMetadata对象。

一个表会唯一绑定一个最新的TableMetadata实例,而此TableMetadata则包含子所有的历史Snapshots。
对于非事务的快照更新操作,一次commit就会产生一个新的Snapshot,同时也会产生一个新的TableMetadata。
而对于事务的更新,由于一个事务期间可能会基于起始TableMetadata的当前快照,执行一次或多次commit,因此这个TableMetadata包含所有的新生产的Snapshots。

  public void testMultipleOperationTransaction() {
    Assert.assertEquals("Table should be on version 0", 0, (int) version());
    // 先向表中追加一个文件,FILE_C
    table.newAppend().appendFile(FILE_C).commit();
    List<HistoryEntry> initialHistory = table.history();
    // 获取当前的Metadata
    TableMetadata base = readMetadata();
    // 创建一个事务实例
    Transaction txn = table.newTransaction();
    // 验证当前表的Metadata是的版本号为1
    Assert.assertSame(
        "Base metadata should not change when commit is created", base, readMetadata());
    Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
    // 从事务对象,创建一个AppendFiles的实例,即MergeAppend的对象
    // 并追加两个新文件FILE_A、FILE_B
    txn.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
    // 前面的commit()方法,并不会更新表的TaleMetadata
    Assert.assertSame(
        "Base metadata should not change when commit is created", base, readMetadata());
    Assert.assertEquals("Table should be on version 1 after txn create", 1, (int) version());
    // 获取表的追加文件后的最新快照信息
    Snapshot appendSnapshot = txn.table().currentSnapshot();
    // 通过事务对象创建一个文件删除器,即StreamingDelete的实例
    // 并删除FILE_A
    txn.newDelete().deleteFile(FILE_A).commit();
    // 获取表的删除文件后的最新快照信息
    Snapshot deleteSnapshot = txn.table().currentSnapshot();
    // 但表对应的最新TableMetadata实例依然没有变化
    Assert.assertSame(
        "Base metadata should not change when an append is committed", base, readMetadata());
    Assert.assertEquals("Table should be on version 1 after append", 1, (int) version());
    // 提交事务
    txn.commitTransaction();
    // TableMetadata发生了变化,版本号更新到了2
    Assert.assertEquals("Table should be on version 2 after commit", 2, (int) version());
    // 由于前面通过非事务操作提交一了一次数据文件,又通过事务提交了一次,因此会产生2个Manifest Files
    Assert.assertEquals(
        "Table should have two manifest after commit",
        2, readMetadata().currentSnapshot().allManifests(table.io()).size());
    // 由于最后一次的提交操作是delete file,因此产生的最后一个Snapshot就是deleteSnapshot
    Assert.assertEquals(
        "Table snapshot should be the delete snapshot",
        deleteSnapshot,
        readMetadata().currentSnapshot());
    validateManifestEntries(
        readMetadata().currentSnapshot().allManifests(table.io()).get(0),
        ids(deleteSnapshot.snapshotId(), appendSnapshot.snapshotId()),
        files(FILE_A, FILE_B),
        statuses(Status.DELETED, Status.EXISTING));
    // 表最新的TableMetadata应该包含3个历史Snapshots,一个是通过非事务提交产生的;
    // 另外两个是在事务期间提交产生的。
    Assert.assertEquals(
        "Table should have a snapshot for each operation", 3, readMetadata().snapshots().size());
    validateManifestEntries(
        readMetadata().snapshots().get(1).allManifests(table.io()).get(0),
        ids(appendSnapshot.snapshotId(), appendSnapshot.snapshotId()),
        files(FILE_A, FILE_B),
        statuses(Status.ADDED, Status.ADDED));
    // 验证所有的历史提交
    Assertions.assertThat(table.history()).containsAll(initialHistory);
}

创建事务

支持创建、替换以及通用的事务创建。

public final class Transactions {
  private Transactions() {}

  public static Transaction createOrReplaceTableTransaction(
      String tableName, TableOperations ops, TableMetadata start) {
    return new BaseTransaction(tableName, ops, TransactionType.CREATE_OR_REPLACE_TABLE, start);
  }

  public static Transaction replaceTableTransaction(
      String tableName, TableOperations ops, TableMetadata start) {
    return new BaseTransaction(tableName, ops, TransactionType.REPLACE_TABLE, start);
  }

  public static Transaction replaceTableTransaction(
      String tableName, TableOperations ops, TableMetadata start, MetricsReporter reporter) {
    return new BaseTransaction(tableName, ops, TransactionType.REPLACE_TABLE, start, reporter);
  }

  public static Transaction createTableTransaction(
      String tableName, TableOperations ops, TableMetadata start) {
    Preconditions.checkArgument(
        ops.current() == null, "Cannot start create table transaction: table already exists");
    return new BaseTransaction(tableName, ops, TransactionType.CREATE_TABLE, start);
  }

  public static Transaction createTableTransaction(
      String tableName, TableOperations ops, TableMetadata start, MetricsReporter reporter) {
    Preconditions.checkArgument(
        ops.current() == null, "Cannot start create table transaction: table already exists");
    return new BaseTransaction(tableName, ops, TransactionType.CREATE_TABLE, start, reporter);
  }

  public static Transaction newTransaction(String tableName, TableOperations ops) {
    return new BaseTransaction(tableName, ops, TransactionType.SIMPLE, ops.refresh());
  }

  public static Transaction newTransaction(
      String tableName, TableOperations ops, MetricsReporter reporter) {
    return new BaseTransaction(tableName, ops, TransactionType.SIMPLE, ops.refresh(), reporter);
  }
}

事务对象的实例化

public class BaseTransaction implements Transaction {
  private final String tableName;
  // 保存实际的表操作集合对象
  private final TableOperations ops;
  private final TransactionTable transactionTable;
  private final TableOperations transactionOps;
  // 保存所有的从事务创建出来的SnapshotUpdate的实现类的对象
  private final List<PendingUpdate> updates;
  // 保存事物期间删除的所有文件路径
  private final Set<String> deletedFiles =
      Sets.newHashSet(); // keep track of files deleted in the most recent commit
  private final Consumer<String> enqueueDelete = deletedFiles::add;
  private TransactionType type;
  private TableMetadata base;
  private TableMetadata current;
  private boolean hasLastOpCommitted;
  private final MetricsReporter reporter;
  
  BaseTransaction(
      String tableName,
      TableOperations ops,
      TransactionType type,
      TableMetadata start,
      MetricsReporter reporter) {
    this.tableName = tableName;
    this.ops = ops;
    // 创建一个事务类开的表对象,提供表相关的事务方法
    this.transactionTable = new TransactionTable();
    // 保存事务开始时的TableMetadata
    this.current = start;
    // 事务表上的操作集,可以支持
    this.transactionOps = new TransactionTableOperations();
    this.updates = Lists.newArrayList();
    // 保存最新的TableMetadata,有可能与start不同,但版本号大于等待start
    this.base = ops.current();
    this.type = type;
    this.hasLastOpCommitted = true;
    this.reporter = reporter;
  }

  /**
   * 在BaseTransaction实例构造时,会实例化一个此类的实例,它是对原始TableOperations的封装,例如HiveTableOperations,
   * 以支持从此事务创建的具体的操作的提交行为。
   */
  public class TransactionTableOperations implements TableOperations {
    private TableOperations tempOps = ops.temp(current);

    @Override
    public TableMetadata current() {
      return current;
    }

    @Override
    public TableMetadata refresh() {
      return current;
    }

    @Override
    @SuppressWarnings("ConsistentOverrides")
    public void commit(TableMetadata underlyingBase, TableMetadata metadata) {
      // underlyingBase指向的是最新的TableMetadata,
      // metata指向的是事务创建时绑定的起始TableMetadata
      if (underlyingBase != current) {
        // 如果最新的TableMetadata发生了改变,那么就需要触发更新当前事务所绑定的TableMetadata到最新版本,以保证不会将事务期间产生的变更更新到旧的Metadata上
        // trigger a refresh and retry
        throw new CommitFailedException("Table metadata refresh is required");
      }
      // 当事务引出的SnapshotUpdate对象commit时,所有变更提交到的TableMeatadata实例与事务绑定的最新的实例相同,
      // 则更新当前事务的各种引用,并不会真正提交变更到表
      BaseTransaction.this.current = metadata;

      this.tempOps = ops.temp(metadata);

      BaseTransaction.this.hasLastOpCommitted = true;
    }

    @Override
    public FileIO io() {
      return tempOps.io();
    }

    @Override
    public EncryptionManager encryption() {
      return tempOps.encryption();
    }

    @Override
    public String metadataFileLocation(String fileName) {
      return tempOps.metadataFileLocation(fileName);
    }

    @Override
    public LocationProvider locationProvider() {
      return tempOps.locationProvider();
    }

    @Override
    public long newSnapshotId() {
      return tempOps.newSnapshotId();
    }
  }
}

事务中的ApendFiles

创建实例

对应于前文,事务测试章节,提到的如下代码行
txn.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();

public class BaseTransaction implements Transaction {

  @Override
  public AppendFiles newAppend() {
    checkLastOperationCommitted("AppendFiles");
    // 创建一个新的AppendFiles实例,注意到这里为此实例绑定的TableOperations是带事务的,
    // 
    AppendFiles append = new MergeAppend(tableName, transactionOps).reportWith(reporter);
    // 为append添加一个回调函数,当删除文件成功后,就将文件路径记录到当前事务维护的队列中
    append.deleteWith(enqueueDelete);
    // 添加 从事务新创建的SnapshotUpdate类型的对象 到缓存队列
    updates.add(append);
    return append;
  }
}

检查变更的合法性

所有继承自SnapshotProducer的实例类,可以实现自己的验证逻辑,其接口定义如下:

abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {

  /**
   * Validate the current metadata.
   *
   * <p>Child operations can override this to add custom validation.
   *
   * @param currentMetadata current table metadata to validate
   * @param snapshot ending snapshot on the lineage which is being validated
   */
  protected void validate(TableMetadata currentMetadata, Snapshot snapshot) {}
}

例如对于BaseRewriteFiles的更新,它的实现如下:

  @Override
  protected void validate(TableMetadata base, Snapshot parent) {
    if (replacedDataFiles.size() > 0) {
      // if there are replaced data files, there cannot be any new row-level deletes for those data
      // files
      validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles, parent);
    }
  }

提交变更

MergeAppend类没有覆盖父类的commit()方法,故调用的实际上是SnapshotProducer::commit()方法,它与非事务模式下的Snapshot生成及更新过各相同,只是在将新的Snapshot应用到TableMetadata时,会将apply(TableMetadata base, Snapshot snapshot)方法路由到父类MergingSnapshotProducer,具体的代码见下:

class MergeAppend extends MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
  
  /**
   * 将新生产的Snapshot应用到目标TableMetadata。
   */
  @Override
  public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
    // filter any existing manifests
    // 过滤出可以从当前Snapshot可以访问的Manifest Files
    List<ManifestFile> filtered =
        filterManager.filterManifests(
            SnapshotUtil.schemaFor(base, targetBranch()),
            snapshot != null ? snapshot.dataManifests(ops.io()) : null);
    // 获取最小的数据文件的序列号,由于TableMetadata记录的lastSequenceNumber并不一定与ManifestFile记录的所有合法文件的最小序列号相同,
    // 故在清理垃圾文件时需要找到最小的序列号
    long minDataSequenceNumber =
        filtered.stream()
            .map(ManifestFile::minSequenceNumber)
            .filter(
                seq ->
                    seq
                        != ManifestWriter
                            .UNASSIGNED_SEQ) // filter out unassigned in rewritten manifests
            .reduce(base.lastSequenceNumber(), Math::min);
    // 删除所有序列号小于minDataSequenceNumber的Delete files,这些文件后续也不会被用到
    deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
    List<ManifestFile> filteredDeletes =
        deleteFilterManager.filterManifests(
            SnapshotUtil.schemaFor(base, targetBranch()),
            snapshot != null ? snapshot.deleteManifests(ops.io()) : null);

    // only keep manifests that have live data files or that were written by this commit
    Predicate<ManifestFile> shouldKeep =
        manifest ->
            manifest.hasAddedFiles()
                || manifest.hasExistingFiles()
                || manifest.snapshotId() == snapshotId();
    Iterable<ManifestFile> unmergedManifests =
        Iterables.filter(Iterables.concat(prepareNewManifests(), filtered), shouldKeep);
    Iterable<ManifestFile> unmergedDeleteManifests =
        Iterables.filter(Iterables.concat(prepareDeleteManifests(), filteredDeletes), shouldKeep);

    // update the snapshot summary
    summaryBuilder.clear();
    summaryBuilder.merge(addedFilesSummary);
    summaryBuilder.merge(appendedManifestsSummary);
    summaryBuilder.merge(filterManager.buildSummary(filtered));
    summaryBuilder.merge(deleteFilterManager.buildSummary(filteredDeletes));
    // 将所有当前Snapshot产生的数据文件或删除文件,与历史的Manifest Files合并到一起,这些文件就是当前表可以访问的活数据。
    List<ManifestFile> manifests = Lists.newArrayList();
    Iterables.addAll(manifests, mergeManager.mergeManifests(unmergedManifests));
    Iterables.addAll(manifests, deleteMergeManager.mergeManifests(unmergedDeleteManifests));

    return manifests;
  }
}

提交TableMetadata

经过上一小节对事务绑定的TableMetadata的更新之后,就可以真正地执行提交动作,尝试通过TableOperations对象将最新的TableMetadata实例更新到目标表。

但由于我们现在是对整个事务流程中的某一个操作进行提交,因此不应该直接将这些变更到底层的表,即不应该调用底层的HiveTableOperations对象的commit()方法。还记得在前面提到在类BaseTransaction中定义了TransactionTableOperations吧,实际上这个在事务期间,都是通过这个Operations实例来完成TableMetadata的提交的,其具体的方法定义如下:

    public void commit(TableMetadata underlyingBase, TableMetadata metadata) {
      // underlyingBase指向的是最新的TableMetadata,
      // metata指向的是事务创建时绑定的起始TableMetadata
      if (underlyingBase != current) {
        // 如果最新的TableMetadata发生了改变,那么就需要触发更新当前事务所绑定的TableMetadata到最新版本,以保证不会将事务期间产生的变更更新到旧的Metadata上
        // trigger a refresh and retry
        throw new CommitFailedException("Table metadata refresh is required");
      }
      // 当事务引出的SnapshotUpdate对象commit时,所有变更提交到的TableMeatadata实例与事务绑定的最新的实例相同,
      // 则更新当前事务的各种引用,并不会真正提交变更到表
      BaseTransaction.this.current = metadata;

      this.tempOps = ops.temp(metadata);

      BaseTransaction.this.hasLastOpCommitted = true;
    }

通过上面的commit()方法可以知道,不同于HiveTableOperations中的实现那样,这里仅仅是更新了当前事务的成员变量,以便后续的更新操作能够继续应用到此TableMetadata实例上。

事务提交

BaseTransaction::commitTransaction

  @Override
  public void commitTransaction() {
    Preconditions.checkState(
        hasLastOpCommitted, "Cannot commit transaction: last operation has not committed");

    switch (type) {
      case CREATE_TABLE:
        commitCreateTransaction();
        break;

      case REPLACE_TABLE:
        commitReplaceTransaction(false);
        break;

      case CREATE_OR_REPLACE_TABLE:
        commitReplaceTransaction(true);
        break;

      case SIMPLE:
        commitSimpleTransaction();
        break;
    }
  }

commitSimpleTransaction

  private void commitSimpleTransaction() {
    // if there were no changes, don't try to commit
    // 在讲解AppendFiles的提交流程时,有提到它的commit()方法,实际上是更新当前事务的current引用,保存了更新的TableMetadata实例
    // 因此如果这里base == current,说明在之前没有调用于commit方法,因此不会执行事务的提交
    if (base == current) {
      return;
    }

    Set<Long> startingSnapshots =
        base.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
    try {
      Tasks.foreach(ops)
          .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
          .exponentialBackoff(
              base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
              base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
              base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
              2.0 /* exponential */)
          .onlyRetryOn(CommitFailedException.class)
          .run(
              underlyingOps -> {
                // 调用BaseTransaction::newAppend()方法后生成一个MergeAppend实例,同时这个实例会被添加到缓存队列中,
                // 这里就是遍历这个缓存队列,尝试再次对这个更新操作进行提交,即二次提交
                // 至于为什么要再一次提交,见后面的解析
                applyUpdates(underlyingOps);
                // 当所有的更新都提交完成时,就可以真正地执行底层表上的更新操作,更新最新的TableMetadata
                underlyingOps.commit(base, current);
              });

    } catch (CommitStateUnknownException e) {
      throw e;

    } catch (PendingUpdateFailedException e) {
      cleanUpOnCommitFailure();
      throw e.wrapped();
    } catch (RuntimeException e) {
      cleanUpOnCommitFailure();
      throw e;
    }

    // the commit succeeded

    try {
      // clean up the data files that were deleted by each operation. first, get the list of
      // committed manifests to ensure that no committed manifest is deleted.
      // A manifest could be deleted in one successful operation commit, but reused in another
      // successful commit of that operation if the whole transaction is retried.
      Set<Long> newSnapshots = Sets.newHashSet();
      for (Snapshot snapshot : current.snapshots()) {
        if (!startingSnapshots.contains(snapshot.snapshotId())) {
          newSnapshots.add(snapshot.snapshotId());
        }
      }

      Set<String> committedFiles = committedFiles(ops, newSnapshots);
      if (committedFiles != null) {
        // delete all of the files that were deleted in the most recent set of operation commits
        Tasks.foreach(deletedFiles)
            .suppressFailureWhenFinished()
            .onFailure((file, exc) -> LOG.warn("Failed to delete uncommitted file: {}", file, exc))
            .run(
                path -> {
                  if (!committedFiles.contains(path)) {
                    ops.io().deleteFile(path);
                  }
                });
      } else {
        LOG.warn("Failed to load metadata for a committed snapshot, skipping clean-up");
      }

    } catch (RuntimeException e) {
      LOG.warn("Failed to load committed metadata, skipping clean-up", e);
    }
  }

applyUpdates

  private void applyUpdates(TableOperations underlyingOps) {
    // base保存的是在创建事务实例时,绑定的最新的TableMetadata
    if (base != underlyingOps.refresh()) {
      // 由于commit事务之前,表的Metadata很可能发生了变化,因此就需要我们将所有的更新操作修正到最新的实例上
      // use refreshed the metadata
      this.base = underlyingOps.current();
      this.current = underlyingOps.current();
      for (PendingUpdate update : updates) {
        // re-commit each update in the chain to apply it and update current
        try {
          update.commit();
        } catch (CommitFailedException e) {
          // Cannot pass even with retry due to conflicting metadata changes. So, break the
          // retry-loop.
          throw new PendingUpdateFailedException(e);
        }
      }
    }
  }

乐观锁更新TableMetadata

Iceberg基于乐观锁的实现,以达到无锁提交TableMetadata的目的,因此在更新TableMetadata时,需要总是尝试获取表锁以便能够顺利更新表数据,例如对于Hive表保存Metadata时,则需要通过HiveTableOperations::doCommit(...)方法执行上锁、更新、释放锁的流程。

更新本地表

table.refresh();
文章来源:https://blog.csdn.net/u014445499/article/details/135180305
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。