golang并发编程-channel

发布时间:2024年01月02日

在golang 并发编程里,经常会听到一句话:不要通过共享内存进行通信,通过通信来共享内存。下面我们会介绍下channel, 通过源码的方式去了解channel是怎么工作的。

基本结构

流程图

代码解读

type hchan struct {
    qcount   uint           // 队列中的总数据
    dataqsiz uint           // 环形队列的大小
    buf      unsafe.Pointer // 指向数据数组 qsiz 的元素
    elemsize uint16 // 循环队列中元素的大小;
    closed   uint32 // chan关闭标志
    elemtype *_type // 循环队列中元素的类型
    sendx    uint   // 待发送的数据在循环队列buf中的索引
    recvx    uint   // 待接收的数据在循环队列buf中的索引
    recvq    waitq  // 等待发送数据的 goroutine
    sendq    waitq  // 等待接收数据的 goroutine

    // 锁保护hchan中的所有字段,以及此chan 上被阻止的sudogs中的几个字段。
    lock mutex
}
type waitq struct {
	first *sudog
	last  *sudog
}
// sudog代表等待列表中的g,例如用于在通道上进行发送/接收。
type sudog struct {

	g *g 

	next *sudog
	prev *sudog
	elem unsafe.Pointer // 数据元素 (可能指向栈)

	
	// 以下字段永远不会同时访问。
    // 对于chan,waitlink仅由g访问。
    // 对于信号量,所有字段(包括上面的那些)只有在持有semaRoot锁时才能访问。
	acquiretime int64
	releasetime int64
	ticket      uint32

	// isSelect表示g在 select 上
	isSelect bool

	// success 表示通道 c 上的通信是否成功。
    // 如果 goroutine 被唤醒是因为通道 c 上传递了值,则为 true,
    // 如果唤醒是因为通道 c 关闭,则为 false。
	success bool

	parent   *sudog // semaRoot binary tree
	waitlink *sudog // g.waiting list or semaRoot
	waittail *sudog // semaRoot
	c        *hchan // channel
}

创建

例子

// make(chan 类型, 大小)
chBuf := make(chan int, 1024) // 缓冲队列
ch := make(chan int) // 无缓存队列(也叫同步队列)

我很重要: channel带缓存的异步,不带缓存同步

思维图

代码解读

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // 检查数据是不是超过64kb
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }
	// 检查缓存是否溢出
    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    // 当buf中存储的元素不包含指针时,chan不包含对GC感兴趣的指针。
    // buf指向相同的分配,elemtype固定不变。
    // SudoG 的引用来自其所属的线程,因此无法收集。
    // TODO(dvyukov,rlh):重新思考收集器何时可以移动分配的对象。
    var c *hchan
    switch {
        case mem == 0:  // 队列或元素大小为零
        // 只为hchan分配内存
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // 竞态检查,利用这个地址进行同步操作.  
        c.buf = c.raceaddr()
        case elem.ptrdata == 0://元素不是指针
		// 分配一块连续的内存给hchan和buf    
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
        default: // 默认
        // 单独为 hchan 和缓冲区分配内存
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true) 
    }

    c.elemsize = uint16(elem.size) // 循环队列中元素的大小
    c.elemtype = elem // 元素类型
    c.dataqsiz = uint(size)  
    lockInit(&c.lock, lockRankHchan) // 锁优先级

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

channel 创建的时候分以下三种情况:

1- 队列或元素大小为零,调用mallocgc 在堆上给chan 创建一个hchansize大小的内存空间

2- 元素不是指针,调用mallocgc 在堆上给chan 创建一个 大小hchansize+ mem连续的内存空间(队列+buf缓存空间)

3- 默认情况下,调用mallocgc 单独为 hchan 分配内存

发送

代码解读

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil { // 如果chan为nil
		if !block { // 非阻塞直接返回flase
			return false
		}
        // 阻塞直接挂起抛出异常
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
	}

	
	// Fast path:在不获取锁的情况下检查失败的非阻塞操作。
	// 在观察到通道未关闭后,我们观察到 channel 未准备好发送。(c.closed和full())
    
    //当 channel 不为 nil,并且 channel 没有关闭时,
    // 如果没有缓冲区且没有接收者rec或者缓冲区已经满了,返回 false。
    if !block && c.closed == 0 && full(c) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock) // 加锁
	
	if c.closed != 0 { // 如果已经关闭了,就抛出异常
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	// 有等待的接收者。
	if sg := c.recvq.dequeue(); sg != nil {
        // 我们将要发送的值直接传递给send,绕过通道缓冲区(如果有的话)。
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
    // 缓冲区存在空余空间时
	if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx)     // 找到要发送数据到循环队列buf的索引位置
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
        // 数据拷贝到循环队列中
		typedmemmove(c.elemtype, qp, ep)
        // 将待发送数据索引加1,
		c.sendx++
        // 如果到了末尾,从0开始 (循环队列)
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
        // chan中元素个数加1
		c.qcount++
        // 释放锁返回true
		unlock(&c.lock)
		return true
	}

	if !block { // 缓冲区没有空间,直接返回false
		unlock(&c.lock)
		return false
	}
    
	// channel上阻塞。一些receiver将为我们完成操作。
    gp := getg() // 获取发送数据使用的 G
    // acquireSudog() 获取一个 sudog,如果缓存没有获取的到,就新建一个。
	mysg := acquireSudog() 
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}

    // 在分配 elem 和将 mysg 加入 gp.waiting 队列,
    // 没有发生堆栈分裂,copystack 可以找到它。
	mysg.elem = ep
	mysg.waitlink = nil 
	mysg.g = gp
	mysg.isSelect = false // 是否在 select 
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
     // 调用 c.sendq.enqueue 方法将配置好的 sudog 加入待发送的等待队列
	c.sendq.enqueue(mysg)

	atomic.Store8(&gp.parkingOnChan, 1)
    // 调用gopark方法挂起当前goroutine,状态为waitReasonChanSend,阻塞等待channel接收者的激活
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// 确保发送的值保持活动状态,直到接收者将其复制出来
	KeepAlive(ep)

	// G 被唤醒
	if mysg != gp.waiting { // 如果sudog 结构体中等待的 g  和获取到不一致
		throw("G waiting list is corrupted")
	}
    
	gp.waiting = nil。// 等待请空
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	if closed {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	return true
}
  1. chan 为nil 非阻塞的话就返回false, 阻塞panic
  2. 当 channel 不为 nil且 channel 没有关闭时,如果没有缓冲区且没有接收者rec或者缓冲区已经满了,返回 false。
  3. 如果存在等待的接收者,通过 send 直接将数据发送给阻塞的接收者
  4. 如果缓冲区存没有满,chanbuf + typedmemmove 将发送的数据写入 chan 的缓冲区;
  5. 如果缓冲区已满时,把发送数据的goroutin sendq中等待其他 G 接收数据;
func full(c *hchan) bool {
	// c.dataqsiz是不可变的(在创建通道后不可再写操作),因此在通道操作期间的任何时候读取都是安全的。
	if c.dataqsiz == 0 { // 数据是空的
		return c.recvq.first == nil//指针读取是近似原子性的
	}
	// 数据满了
	return c.qcount == c.dataqsiz//指针读取是近似原子性的
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if raceenabled {
		if c.dataqsiz == 0 {
			racesync(c, sg)
		} else {
            // 如果我们通过缓冲区,即使我们直接复制。
            //请注意,我们只需要在raceenabled时增加头/尾位置。
			racenotify(c, c.recvx, nil)
			racenotify(c, c.recvx, sg)
			c.recvx++
			if c.recvx == c.dataqsiz {
				c.recvx = 0
			}
			c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz -- 发送的索引位置获取原理
		}
	}
	if sg.elem != nil {// 直接把要发送的数据拷贝到receiver的内存地址
		sendDirect(c.elemtype, sg, ep)
		sg.elem = nil
	}
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}

    // 将等待接收数据的 G 标记成可运行状态 Grunnable (goready-ready: casgstatus(gp, _Gwaiting, _Grunnable))
    //把该 G 放到发送方所在的处理器的 runnext 上等待执行, 该处理器在下一次调度时会立刻唤醒数据的接收方(runqput方法)
	// runqput 尝试将 g 放入本地可运行队列:
		// 如果 next 为 false,runqput 会将 g 添加到可运行队列的尾部。
		// 如果 next 为 true,runqput 将 g 放入 _p_.runnext 槽中。 这里为true
		// 如果运行队列已满,runnext 会将 g 放入全局队列。
		// 仅由所有者 P 执行。
    goready(gp, skip+1) // 唤醒等待的接收者goroutine
}

1- 直接把要发送的数据拷贝到receiver的内存地址

2- 将等待接收数据的 G 标记成可运行状态 Grunnable 。把该 G 放到发送方所在的处理器的 runnext 上等待执行, 该处理器在下一次调度时会立刻唤醒数据的接收方

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
	//src 在我们的栈上,dst 是另一个栈上的一个插槽。
    
    // 一旦我们从sg中读取了sg.elem,如果目标堆栈被复制(收缩),它将不再被更新。
    // 因此,请确保在读取和使用之间不会发生抢占点。
    
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    // 不需要cgo写屏障检查,因为dst始终是Go内存。
    memmove(dst, src, t.size)
}

sendDirect:数据拷贝到了接收者的内存地址上

// chanbuf(c, i) 是指向缓冲区中第 i 个槽的指针
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

接收

代码解读
?

//chanrecv在通道c上接收数据,并将接收到的数据写入ep。
//ep可能为零,在这种情况下,收到的数据将被忽略。
//如果 block == false 且没有可用元素,则返回 (false, false)。
//否则,如果c是闭的,则返回零*ep并返回(true,false)。
//否则,用元素填充 *ep 并返回 (true, true)。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// raceenabled:不需要检查ep,因为它始终位于堆栈上,或者是由reflect分配的新内存
	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}

	if c == nil {
		if !block { // 如果c为空且是非阻塞调用,那么直接返回 (false, false)
			return
		}
        // 若果阻塞直接挂起,抛出错误
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// Fast path: 在不获取锁的情况下检查失败的非阻塞操作
	if !block && empty(c) { // 通过empty()判断是无缓冲chan或者是chan中没有数据
        // 在观察到channel未准备好接收后,看channel是否已关闭。

		if atomic.Load(&c.closed) == 0 { // 如果chan没有关闭,则直接返回 (false, false)
			return
		}
		// 如果chan关闭, 为了防止检查期间的状态变化,二次调用empty()进行原子检查二次调用empty()进行原子检查,
        // 如果是无缓冲chan或者是chan中没有数据,返回 (true, false) 
        // 通道已不可逆地关闭。 为了防止检查期间的状态变化,重新检查通道是否有任何待接收的数据
        if empty(c) {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	lock(&c.lock)
	// 如果已经关闭且chan中没有数据,返回 (true,false)
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
	 // 从sendq 队列获取等待发送的 G     
	if sg := c.sendq.dequeue(); sg != nil {
        // 在 channel 的sendq队列中找到了等待发送的 G,取出队头等待的G。
        // 找到一个等待发送的sender 。如果缓冲区大小为 0,则直接从sender接收值 。
        // 否则,从队列头部接收,并将发送者的值添加到队列尾部(由于队列已满,这两个值都映射到同一个缓冲区下标)。
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

	// 如果缓冲区有数据
	if c.qcount > 0 {
		// 直接从队列接收
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
        // 接收数据地址ep不为空,直接从缓冲区复制数据到ep  
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++ // 待接收索引加1
		if c.recvx == c.dataqsiz { // 如果循环队列到了末尾,从0开始 
			c.recvx = 0
		}
		c.qcount-- // 缓冲区数据减1
		unlock(&c.lock)
		return true, true
	}

	if !block { // 如果是select非阻塞读取的情况,直接返回(false, false)
		unlock(&c.lock)
		return false, false
	}

	//没有可用的发送者:阻塞channel。
	gp := getg() // 获取当前 goroutine 的指针,用于绑定给一个 sudog 
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
}
  1. 如果chan 为nil ,非阻塞的话返回(false, false),阻塞的panic
  2. chan已经关闭并且缓存没有数据,直接返回(false, false)
  3. 如果是无缓冲chan或者是chan中没有数据,返回 (true, false)
  4. chan 的sendq队列中存在挂起的G, 会将recvx 索引的数据拷贝到接收变量内存中并将sendq队列中G 数据拷到缓存
  5. 如果缓冲区有数据,直接读取recvx索引的数据
  6. 如果没有可用的发送者,获取当前 G 的指针,用于绑定给一个 sudog 并加入到recvq,等待调度器唤醒
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)
        }
        if ep != nil {
            // 从sender 复制数据
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // 队列已满。从队列的头部取走该项目。
        // 让发送方将其项目排入队列的尾部。
        // 由于队列已满,这两个位置是相同的。
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
            racenotify(c, c.recvx, sg)
        }
        // 将数据从队列复制到receiver
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // 将数据从sender复制到队列
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    goready(gp, skip+1)
}
  • 如果 chan 不存在缓冲区:调用 recvDirect 将 发送队列中 G 存储的 elem 数据拷贝到目标内存地址中;
  • 如果 chan 存在缓冲区:
    • 1- 将队列中的数据拷贝到receiver 的内存地址;
    • 2- 将 sender 头的数据拷贝到缓冲区中,释放一个阻塞的 sender;
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    // dst 在我们的栈或堆上,src 在另一个栈上。
   // chan已锁定,因此在此操作期间src将不会移动。
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}
// empty报告从c读取是否会阻塞(即通道为空)。它使用对可变状态的单个原子读取。
func empty(c *hchan) bool {  
    // c.dataqsiz 是不可变的  
    if c.dataqsiz == 0 {    
        // 发送队列为空
        return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil 
    }  
    return atomic.Loaduint(&c.qcount) == 0
}

关闭

代码解读
?

func closechan(c *hchan) {
	if c == nil { // 关闭空的chan,会panic
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 { // 关闭已经关闭的chan,会panic
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
		racerelease(c.raceaddr())
	}
	// chan的closed置为关闭状态
	c.closed = 1

	var glist gList // 申明一个存放所有接收者和发送者goroutine的list

	// 处理所有的recv(等待发送数据的 goroutine)
	for { 
		sg := c.recvq.dequeue() // 将缓存队列待处理的recv 拿出来
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp) // 放到 glist
	}

	// release all writers (they will panic)
	for { // 获取所有发送者
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

// 唤醒所有的glist中的goroutine
    for !glist.empty() { 
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

1- 如果channel 是nil 空指针或者已经关闭,会pnaic

2- close操作会做一些收尾的工作:将 recvq , sendq 数据放到 glist(释放队列)中,统一进行释放掉。

3- 有一个点需要注意,先释放 sendq 中的recver 不会存在问题,向关闭的channel 接收数据,最多返回nil。但是如果recvq 的 sender 就会出现问题,因为不能往关闭 channel 发送数据(panic)

总结

  • channel 总体上分三类:

1- 同步:没有缓存

2- 异步: 有缓存

3- chan struct{} 类型的异步 Channel — struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送

  • 读/写/关闭情况可以下面这个表概括

文章来源:https://blog.csdn.net/lin_keys/article/details/135340506
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。