Go语言中的同步原语:ErrGroup、Semaphore和SingleFlight

发布时间:2024年01月12日

1. 并发基础

并发是同时发生多个计算或事件的能力。并发通常通过同时执行多个任务或进程来实现,这些任务或进程共享相同的资源(例如内存或处理器)。并发使用的基本机制被称为锁。在Go语言中,锁是一个类型变量,它包含一个内部计数器,用于跟踪已获取的锁的数量。当一个goroutine获取一个锁时,它会将计数器增加一;当一个goroutine释放一个锁时,它会将计数器减少一。

2. 同步原语

同步原语是一组特殊的变量或类型,用于在并发程序中协调goroutine之间的通信和同步。Go语言中提供了丰富的同步原语,包括互斥锁(mutex)、读写锁(RWMutex)、等待组(WaitGroup)、一次性锁(Once)、条件变量(Cond)、错误组(ErrGroup)、信号量(Semaphore)和单次调用(SingleFlight)。

3. ErrGroup

ErrGroup是一个同步原语,它允许一组goroutine并发地执行任务,并收集所有goroutine执行过程中发生的错误。ErrGroup包含一个内部错误列表,当任何一个goroutine在执行任务时发生错误,该错误将被添加到错误列表中。ErrGroup还提供了一个Wait方法,该方法将阻塞当前goroutine,直到所有goroutine都完成执行任务,或者发生错误。

package main

import (
    "context"
    "fmt"
    "sync"
)

func main() {
    // 创建一个错误组
    var eg sync.ErrGroup

    // 创建三个goroutine来并发地执行任务
    for i := 0; i < 3; i++ {
        i := i
        eg.Go(func() error {
            // 模拟任务执行
            if i == 2 {
                return fmt.Errorf("error occurred in goroutine %d", i)
            }
            return nil
        })
    }

    // 等待所有goroutine完成执行任务
    if err := eg.Wait(); err != nil {
        fmt.Println(err) // 输出:error occurred in goroutine 2
    }
}

在这个示例中,我们使用ErrGroup来收集三个goroutine执行过程中发生的错误。如果任何一个goroutine在执行任务时发生错误,该错误将被添加到错误列表中,并最终在Wait方法中被打印出来。

4. Semaphore

Semaphore是一个同步原语,它用于限制可以同时访问共享资源的goroutine数量。Semaphore包含一个内部计数器,该计数器表示可用的资源数量。当一个goroutine需要访问共享资源时,它必须先获取Semaphore,如果Semaphore的计数器大于0,则该goroutine可以访问共享资源,否则该goroutine将被阻塞,直到Semaphore的计数器大于0。当一个goroutine不再需要访问共享资源时,它必须释放Semaphore,以允许其他goroutine访问共享资源。

package main

import (
    "context"
    "fmt"
    "sync"
)

func main() {
    // 创建一个信号量,限制同时可以访问共享资源的goroutine数量为2
    sem := make(chan struct{}, 2)

    // 创建三个goroutine来并发地访问共享资源
    for i := 0; i < 3; i++ {
        i := i
        go func() {
            // 获取信号量
            sem <- struct{}{}

            // 模拟访问共享资源
            fmt.Println("Goroutine", i, "is accessing the shared resource.")
            time.Sleep(1 * time.Second)

            // 释放信号量
            <-sem
        }()
    }

    // 等待所有goroutine完成
    time.Sleep(3 * time.Second)
}

在这个示例中,我们使用Semaphore来限制同时可以访问共享资源的goroutine数量为2。当一个goroutine需要访问共享资源时,它必须先获取Semaphore,如果Semaphore的计数器大于0,则该goroutine可以访问共享资源,否则该goroutine将被阻塞,直到Semaphore的计数器大于0。

5. SingleFlight

SingleFlight是一个同步原语,它确保某个操作只会被执行一次。SingleFlight包含一个内部映射,该映射将操作的key映射到操作的结果。当一个goroutine需要执行某个操作时,它必须先检查SingleFlight的内部映射中是否已经存在该操作的结果。如果存在,则该goroutine直接返回该结果,否则该goroutine将执行该操作,并将结果存储在SingleFlight的内部映射中,以便其他goroutine可以直接返回该结果。

package main

import (
    "context"
    "fmt"
    "sync"
)

var sf sync.SingleFlight

func main() {
    // 定义一个需要执行的操作
    fn := func() (int, error) {
        // 模拟执行操作
        return 100, nil
    }

    // 并发地执行该操作10次
    for i := 0; i < 10; i++ {
        go func() {
            // 获取操作的结果
            result, err := sf.Do("key", fn)
            if err != nil {
                fmt.Println(err)
                return
            }

            fmt.Println("Result:", result)
        }()
    }

    // 等待所有goroutine完成
    time.Sleep(1 * time.Second)
}

在这个示例中,我们使用SingleFlight来确保fn函数只会被执行一次。当一个goroutine需要执行fn函数时,它必须先检查SingleFlight的内部映射中是否已经存在fn函数的结果。如果存在,则该goroutine直接返回该结果,否则该goroutine将执行fn函数,并将结果存储在SingleFlight的内部映射中,以便其他goroutine可以直接返回该结果。

文章来源:https://blog.csdn.net/u013533380/article/details/135550638
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。