欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 美景 > 【Go万字洗髓经】Golang中sync.Mutex的单机锁:实现原理与底层源码

【Go万字洗髓经】Golang中sync.Mutex的单机锁:实现原理与底层源码

2025/4/2 1:53:19 来源:https://blog.csdn.net/theaipower/article/details/146487121  浏览:    关键词:【Go万字洗髓经】Golang中sync.Mutex的单机锁:实现原理与底层源码

在这里插入图片描述

本章目录

  • 1. sync.Mutex锁的基本用法
  • 2. sync.Mutex的核心原理
    • 自旋到阻塞的升级过程
      • 自旋+CAS
    • 饥饿模式
  • 3. sync.Mutex底层源码
    • Mutex结构定义
    • 全局常量
    • Mutex.Lock()方法
      • 第一次CAS加锁能够成功的前提是?
      • 竞态检测
    • Mutex.lockSlow()
      • lockSlow的局部变量
      • 自旋空转
      • state新值构造
      • 新旧值替换
        • 上锁成功
        • 阻塞挂起
    • sync.Unlock()方法
      • 解锁主干方法
      • unlockSlow的三种情况
        • 异常情况
        • 正常模式
        • 饥饿模式

1. sync.Mutex锁的基本用法

先来看看最基本的用法,也就是下面这幅图中可以看到Mutex的基本用法,就是对敏感资源进行操作,控制并发。并发写是最需要考虑的,因为可能会导致状态不一致的回滚问题。

除了Mutex,还有RWMutex读写锁,并发读是一个幂等性的,对系统并不会造成一个很大的问题,所以并发读的情况下不会有一个额外的加锁的成本,但是一旦有写,那么就需要进行并发控制,写会对数据进行变更。

如果是Get操作,那么我们只需要对下面代码中的m.Mutex.Lock()更改为m.RWMutex.RLock()即可。

在这里插入图片描述

2. sync.Mutex的核心原理

自旋到阻塞的升级过程

当一个goroutine加锁的时候发现已经被抢占了,此时有两种策略,分别是

1、阻塞+唤醒:将当前这个goroutine阻塞挂起,直到这个锁释放,然后以回调的方式将这个阻塞的goroutine重新唤醒,进行锁的争夺。可以视作为悲观锁,其核心思想是假设冲突会发生。

2、自旋+CAS:基于自旋结合CAS的方式,重新校验锁的状态并尝试获取锁,始终把主动权掌握在自己的手里。同时这个自旋+CAS也叫做乐观锁。线程并不会放弃CPU资源,可以认为不需要别的人来通知我这个锁被释放了,我们认为这个锁很快就可以得到了,所以通过自旋的形式主动去争取获得这个资源,不需要把主动权交给调度器。

自旋+CAS

自旋是一种忙等待(busy-waiting)的策略。当一个goroutine尝试获取锁时,如果发现锁已经被其他goroutine占用,它不会直接进入阻塞状态,而是进入一个循环,不断地检查锁的状态,直到锁被释放。这种策略的优点是响应速度快,因为goroutine始终处于运行状态,一旦锁释放,它能够立即尝试获取锁。然而,它的缺点是会占用CPU资源,因为goroutine一直在循环中检查锁的状态,即使锁可能需要很长时间才会被释放。

CAS,也就是Compare-And-Swap,是一种原子操作,用于实现无锁编程。它的基本原理是:比较当前值与预期值,如果相等,则将当前值更新为新值;如果比较结果不相等,则不做任何操作(一气呵成,原子操作,不可以拆解)。在锁的场景中,CAS用于检查锁的状态并尝试获取锁,也就是通过CAS不断去进行一个获取锁上锁的操作。

自旋+CAS的具体步骤如下:

  1. goroutine尝试获取锁时,首先读取锁的状态。
  2. 如果锁是空闲的(未被占用),goroutine会使用CAS操作将锁的状态从“未占用”更新为“已占用”。
  3. 如果锁已经被占用,goroutine会进入自旋状态,不断重复上述步骤,直到CAS操作成功。

阻塞+唤醒的方式不浪费CPU时间,但是需要挂起goroutine协程,进行上下文切换,操作比较重,比较适合并发竞争激烈的场景。

自旋+CAS无需阻塞goroutine,短期角度来看代价比较容易接受,但是如果长时间都没有得到,就会浪费CPU的时间片,比较适合并发竞争强度比较低的场景。

这两种方式比较极端,go中的sync.Mutex结合两种方案的使用场景,制定了锁的升级过程,这个过程就是从乐观转换为悲观的态度:首先保持乐观,用自旋+CAS的策略竞争锁,当到达一定条件之后,判断为过于激烈,转为阻塞+唤醒模式。

达到所谓的一定条件有以下几个方式:

1、当自旋达到4次之后还没有结果之后;
2、CPU单核或者gmp模型中仅有1个P调度器(这个时候自旋,其他的goroutine根本没机会释放锁,自旋纯属空转);
3、当前P的执行队列中仍有待执行的G(避免因为自旋影响到GMP的调度效率)。

饥饿模式

上面的锁的升级策略可以看做是资源竞争时性能方面的一个策略,那么饥饿模式则是对公平性的一个方法。

非饥饿模式(正常模式/非公平模式):默认采用的模式,当由goroutine从阻塞队列(FIFO先进先出,所以每次唤醒的是队列头部的goroutine,在阻塞队列里面可以看做是一个狭隘的公平,但是非公平并不是指的阻塞队列里边的这些goroutine的竞争)被唤醒的时候,会和此时先进入抢锁goroutine进行锁资源的争夺(也就是和正在自旋的goroutine进行竞争),如果抢锁失败,就会重新回到阻塞队列的头部,非公平就体现在这里。被唤醒的老的goroutine相比新的goroutine是劣势地位,因为新的goroutine已经占用CPU时间片了,并且新的goroutine可能有多个,数量上更有优势。

饥饿模式(公平模式):当Mutex阻塞队列中存在饥饿态(长时间取不到这个锁,因为非公平机制)的goroutine时,会进入饥饿模式,将抢锁流程由非公平机制转换为公平机制。在这种情况下,锁的所有权按照阻塞队列的顺序依次进行传递,新的goroutine进行流程的时候不得先抢锁,而是进入队尾排队。

两种模式的转换条件:

正常模式转换为饥饿模式:当阻塞队列存在goroutine等锁超过1ms而不得,则进入饥饿模式。
饥饿模式转换为正常模式:当阻塞队列已经清空,或取得锁的goroutine等锁的时间低于1ms时,转换为正常模式。

3. sync.Mutex底层源码

Mutex结构定义

state是最核心的字段,不同的bit位分别存储了mutexLockedmutexWokenmutexStarving等信息,分别对应是否上锁(也就是实现锁需要的一个状态变量,如0表示未加锁,1表示已加锁,这是最核心的最本质的东西)、是否有goroutine从阻塞队列中被唤醒、是否处于饥饿模式。

mutexLocked:是否上锁(也就是实现锁需要的一个状态变量,如0表示未加锁,1表示已加锁,这是最核心的最本质的东西)。

mutexWoken:是否有goroutine从阻塞队列中被唤醒。

mutexStarving:是否处于饥饿模式

sema表示用于阻塞和唤醒goroutine的信号量。
在这里插入图片描述

全局常量

在这里插入图片描述

const (mutexLocked = 1 << iota // mutex is lockedmutexWokenmutexStarvingmutexWaiterShift = iotastarvationThresholdNs = 1e6
)

mutexLocked = 1:state 最右侧的一个 bit 位标志是否上锁,0-未上锁,1-已上锁;

mutexWoken = 2:state 右数第二个 bit 位标志是否有 goroutine 从阻塞中被唤醒,0-没有,1-有;

mutexStarving = 4:state 右数第三个 bit 位标志 Mutex 是否处于饥饿模式,0-非饥饿,1-饥饿;

mutexWaiterShift = 3:右侧存在 3 个 bit 位标识特殊信息,分别为上述的 mutexLocked、mutexWoken、mutexStarving;

starvationThresholdNs = 1 ms:sync.Mutex 进入饥饿模式的等待时间阈值.

itoa是一个特殊的常量生成器,自增枚举值,通常用在const模块中,简化常量的定义,也就是iota的值从0开始,并且随每个常量的定义递增。

这段代码中,iota 的值从 0 开始,依次递增:(这里的1、2、4是指这个位置上的二进制表示)
mutexLocked 的值为 1 << 0,即 1。
mutexWoken 的值为 1 << 1,即 2。
mutexStarving 的值为 1 << 2,即 4。
mutexWaiterShift 的值为 3。

state是int32类型,而前面已经有3个占了,剩下的29位可以记录阻塞队列中的协程goroutine数量,最多可以记录2^29-1个。

在这里插入图片描述
后续可以直接通过一些运算符号进行特殊的判断:

state & mutexLocked:(与运算,最右侧的值进行运算)判断是否上锁;
state | mutexLocked:(或运算,把最右侧的值置为1)加锁动作;
state & mutexWoken:判断是否存在抢锁的协程;
state | mutexWoken:更新状态,标识存在抢锁的协程;
state &^ mutexWoken:更新状态,标识不存在抢锁的协程(&^ 是与异或,比如说 x&^y,如果y=1,结果为0,如果y=0,结果为x,也就是如果mutexWoken的某一位为1,那么会把state上的这个置为0。一句话来概述,作用是 清除 state 中的 mutexWoken 标志位,同时保留其他位不变。)
state & mutexStarving:判断是否处于饥饿模式。
state | mutexStarving:置为饥饿模式。
state>>mutexWaiterShif:获取阻塞等待的协程数,也就是相当于把state右移3位,因为有3个标志位表示了这个state的状态。
state+=1 << mutexWaiterShif :阻塞等待的协程数+1.

Mutex.Lock()方法

Lock()方法的作用是尝试获取互斥锁(mutex)。如果锁已经被占用,它会进入慢路径(lockSlow方法)来处理锁的等待和获取。

下图是Go1.23版本的Lock源码,逻辑非常简单,可以视为先进行一次CAS操作,成功了就会让当前这个goroutine持有锁,失败了就会进入lockslow流程。

在这里插入图片描述
atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked)就是CAS,是一个原子操作,尝试将m.state从0(表示锁未被占用)更新为mutexLocked(表示锁已被占用)。如果m.state的当前值为0,则将m.state更新为mutexLocked,并返回true。如果m.state的当前值不是0,则什么也不做,返回false。

if race.Enabled:是Go语言的竞态检测(Race Detector)功能。如果竞态检测被启用,race.Acquire会记录当前锁的获取操作。这有助于在开发阶段检测潜在的竞态条件。

如果锁被成功获取(即atomic.CompareAndSwapInt32返回true),则直接返回,表示锁已经被当前goroutine持有。

如果快速路径失败,也就是automic.CASint32返回false,就会进入lockSlow方法进入慢路径处理操作,lockslow会将goroutine加入等待队列,阻塞当前goroutine,直到锁被释放,当锁释放的时候,会从等待队列中唤醒一个goroutine,并尝试再次获取锁。

第一次CAS加锁能够成功的前提是?

首先对比state是不是0,并且能够从0改成1,才能加锁成功。

由0置为1的前提是state都为0,那么这个state都为0的前提是左侧29个bit位都为0,意味着这个锁当前没有阻塞的goroutine存在,并且这个锁是正常模式(第3个bit位,mutexStarving位 为0时表示正常模式,为1表示饥饿模式),并且mutexWoken(第2位bit位为0)表示无协程正在取锁,且最重要的第一位mutexLocked也为0,回顾一下这个图,也就是只有所有状态成立的情况下,才能加锁成功。

任何的goroutine进来都会尝试第一个CAS操作,都会尝试取锁。

在这里插入图片描述

竞态检测

		if race.Enabled {race.Acquire(unsafe.Pointer(m))}

这里有个比较特殊的东西,竞态检测,好像Go的1.19版本是没有这两行代码的。race.Enabled 是一个布尔值,表示竞态检测器是否被启用。当使用 -race 标志编译和运行程序时,race.Enabled 会被设置为 true。如果竞态检测器未启用(即没有使用 -race 标志),race.Enabledfalse,这两行代码不会执行。

race.Acquire(unsafe.Pointer(m))race.Acquire 是竞态检测器提供的一个函数,用于记录当前协程获取了一个锁。unsafe.Pointer(m) 是将 m(Mutex 的指针)转换为 unsafe.Pointer 类型。unsafe.Pointer 是一个通用指针类型,可以用于表示任意类型的指针。调用 race.Acquire 的目的是告诉竞态检测器:当前协程正在获取一个锁,并且这个锁的地址是 m。

竞态检测是Go语言提供的一种工具,用于检测并发程序中的竞态条件(Race Condition)。竞态条件是指多个线程(或协程)同时访问共享资源时,由于访问顺序不确定而导致程序行为不可预测的问题。例如,多个协程同时读写同一个变量,可能会导致变量的值变得不确定。

  • Race Detector集成在Go工具链中,通过以下方式工作:

编译时插入检测代码:当使用-race标志编译程序时,编译器会在每个内存访问操作中插入检测代码。
运行时监控:运行时库会监控对共享变量的访问,记录访问的时间、操作类型(读或写)、以及操作的协程。
检测冲突:如果检测到以下情况之一,则认为存在数据竞争:
当前操作是写操作,且与最近的读或写操作并发发生,且这些操作来自不同的协程。
当前操作是读操作,且与最近的写操作并发发生,且这些操作来自不同的协程。
报告问题:当检测到数据竞争时,Race Detector会生成详细的报告,包括发生问题的代码位置、涉及的协程以及栈跟踪信息。

启用Race Detector非常简单,只需在Go命令中添加-race标志即可,比如命令 go run -race hellogoroutine.go编译运行程序。

比如我们有下面这个代码:

package mainimport ("fmt""sync"
)var counter intfunc main() {var wg sync.WaitGroupfor i := 0; i < 100; i++ {wg.Add(1)go func() {defer wg.Done()counter++}()}wg.Wait()fmt.Println("Counter:", counter)
}

如果运行时使用-race标志,假设得到了下面的这个输出,那么其实得到的信息是:

在地址 0x00c00009e008 处(通常是共享变量的内存地址),goroutine 8 执行了一个读操作,main.main.func1()指明了读操作发生的具体函数和文件位置,main.main.func1() 是一个匿名函数,位于 /path/to/main.go 文件的第 15 行。

Previous write at 0x00c00009e008 by goroutine 6:表示在相同的地址 0x00c00009e008 处,goroutine 6 之前执行了一个写操作。
这表明 goroutine 6goroutine 8 对同一个变量进行了并发的读写操作,导致了数据竞争。

Goroutine 8 (finished) created at表示协程已经完成了,Goroutine 6 (running) created at表示正在运行,Found 1 data race(s)竞态检测器总结报告,表明检测到了 1 个数据竞争问题。

通常Race Detector的报告提供了详细的上下文信息,帮助开发者快速定位和修复数据竞争问题。通过使用同步机制(如互斥锁),可以避免并发访问导致的竞态条件,从而提高程序的稳定性和可靠性。

$ go run -race main.go
==================
WARNING: DATA RACE
Read at 0x00c00009e008 by goroutine 8:main.main.func1()/path/to/main.go:15 +0x48Previous write at 0x00c00009e008 by goroutine 6:main.main.func1()/path/to/main.go:15 +0x60Goroutine 8 (finished) created at:main.main()/path/to/main.go:12 +0x7eGoroutine 6 (running) created at:main.main()/path/to/main.go:12 +0x7e
==================
Counter: 100
Found 1 data race(s)
exit status 66

Mutex.lockSlow()

func (m *Mutex) lockSlow() {var waitStartTime int64starving := falseawoke := falseiter := 0old := m.statefor {// Don't spin in starvation mode, ownership is handed off to waiters// so we won't be able to acquire the mutex anyway.if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {// Active spinning makes sense.// Try to set mutexWoken flag to inform Unlock// to not wake other blocked goroutines.if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}runtime_doSpin()iter++old = m.statecontinue}new := old// Don't try to acquire starving mutex, new arriving goroutines must queue.if old&mutexStarving == 0 {new |= mutexLocked}if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift}// The current goroutine switches mutex to starvation mode.// But if the mutex is currently unlocked, don't do the switch.// Unlock expects that starving mutex has waiters, which will not// be true in this case.if starving && old&mutexLocked != 0 {new |= mutexStarving}if awoke {// The goroutine has been woken from sleep,// so we need to reset the flag in either case.if new&mutexWoken == 0 {throw("sync: inconsistent mutex state")}new &^= mutexWoken}if atomic.CompareAndSwapInt32(&m.state, old, new) {if old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}// If we were already waiting before, queue at the front of the queue.queueLifo := waitStartTime != 0if waitStartTime == 0 {waitStartTime = runtime_nanotime()}runtime_SemacquireMutex(&m.sema, queueLifo, 1)starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNsold = m.stateif old&mutexStarving != 0 {// If this goroutine was woken and mutex is in starvation mode,// ownership was handed off to us but mutex is in somewhat// inconsistent state: mutexLocked is not set and we are still// accounted as waiter. Fix that.if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")}delta := int32(mutexLocked - 1<<mutexWaiterShift)if !starving || old>>mutexWaiterShift == 1 {// Exit starvation mode.// Critical to do it here and consider wait time.// Starvation mode is so inefficient, that two goroutines// can go lock-step infinitely once they switch mutex// to starvation mode.delta -= mutexStarving}atomic.AddInt32(&m.state, delta)break}awoke = trueiter = 0} else {old = m.state}}if race.Enabled {race.Acquire(unsafe.Pointer(m))}
}

lockSlow的源码非常长,这里我贴在了上边,后面讲解每一部分的时候再对应进行一个局部代码贴图操作。

lockSlow的局部变量

在这里插入图片描述
waitStartTime:标识当前 goroutine 在抢锁过程中的等待时长,单位:ns;
starving表示是否处于饥饿模式,awoke表示当前模式是否已有协程在等锁。
iter表示当前goroutine参与自旋的次数,old则是存储锁的旧的state的值。

自旋空转

自旋空转的代码如下所示。

for {// Don't spin in starvation mode, ownership is handed off to waiters// so we won't be able to acquire the mutex anyway.if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {// Active spinning makes sense.// Try to set mutexWoken flag to inform Unlock// to not wake other blocked goroutines.if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}runtime_doSpin()iter++old = m.statecontinue}new := old// Don't try to acquire starving mutex, new arriving goroutines must queue.if old&mutexStarving == 0 {new |= mutexLocked}if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift}// The current goroutine switches mutex to starvation mode.// But if the mutex is currently unlocked, don't do the switch.// Unlock expects that starving mutex has waiters, which will not// be true in this case.if starving && old&mutexLocked != 0 {new |= mutexStarving}if awoke {// The goroutine has been woken from sleep,// so we need to reset the flag in either case.if new&mutexWoken == 0 {throw("sync: inconsistent mutex state")}new &^= mutexWoken}if atomic.CompareAndSwapInt32(&m.state, old, new) {if old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}// If we were already waiting before, queue at the front of the queue.queueLifo := waitStartTime != 0if waitStartTime == 0 {waitStartTime = runtime_nanotime()}runtime_SemacquireMutex(&m.sema, queueLifo, 1)starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNsold = m.stateif old&mutexStarving != 0 {// If this goroutine was woken and mutex is in starvation mode,// ownership was handed off to us but mutex is in somewhat// inconsistent state: mutexLocked is not set and we are still// accounted as waiter. Fix that.if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")}delta := int32(mutexLocked - 1<<mutexWaiterShift)if !starving || old>>mutexWaiterShift == 1 {// Exit starvation mode.// Critical to do it here and consider wait time.// Starvation mode is so inefficient, that two goroutines// can go lock-step infinitely once they switch mutex// to starvation mode.delta -= mutexStarving}atomic.AddInt32(&m.state, delta)break}awoke = trueiter = 0} else {old = m.state}}

在这里插入图片描述

我们来一部分一部分研究源码看看,首先第一个if会取出旧的state的值,然后对(mutexLocked|mutexStarving)进行的操作,看看是否等于mutexLocked,如果成立,就代表锁已经加上了,并且还处于正常模式,不处于饥饿模式,也就是下面这个模式,结果就是1,也就是判断==mutexLocked

然后继续通过runtime_canSpin(iter)根据条件(上述的3个条件)来判断是否还能自旋,如果满足其中一个,就是false,就会自旋失败。

接着再通过mutexWoken来判断一次if,如果进入了这个if分支,说明当前锁阻塞队列有协程,但还没有被唤醒,因此需要将mutexWoken置为ture,避免再有其他协程被唤醒和自己抢锁。

随后执行runtime_doSpin(),告知执行器还在自旋中,然后将iter++,并且将新的state赋值给old,然后跳过这次循环,到下一次循环中。

在这里插入图片描述

所以,上图中的整体代码逻辑如下。

在这里插入图片描述

第一个if代码通过条件判断 old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) 来检查当前锁的状态是否允许进行自旋操作。这里,old 是锁的当前状态值,mutexLocked 表示锁已被占用,mutexStarving 表示锁处于饥饿模式。如果锁仅被占用但未进入饥饿模式,并且当前迭代次数 iter 满足自旋条件(通过 runtime_canSpin 函数判断),则进入自旋逻辑。

接下来,代码尝试设置 mutexWoken 标志位。这是为了通知 Unlock 方法,当前协程正在自旋尝试获取锁,因此不需要唤醒其他阻塞的协程。这一逻辑通过 atomic.CompareAndSwapInt32 原子操作实现,只有在当前状态 old 满足特定条件(未设置 mutexWoken 标志且有其他协程在等待队列中)时,才会将 mutexWoken 标志位设置为 1(因为我自己在取锁,所以我要通过设置这个标志位,来告诉别人我自己在取锁,这个实现就是通过原子操作atomic.CASint32(&m.state,old,old|mutexWoken 进行操作的,也就是让老的state来或上这个mutexWoken来置为1,避免还有其他的协程goroutine被唤醒和自己抢锁)。如果设置成功,awoke 标志被置为 true,表示当前协程已进入自旋状态。这个逻辑的目的是在当前协程尝试获取锁时,通知其他可能正在等待锁的协程,当前协程正在自旋尝试获取锁,因此其他协程可以暂时不被唤醒。

我们来详细看看这个内嵌的长if的语句判断逻辑:if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken),首先是判断awoke,标记当前这个协程是否已经尝试设置过mutexWoken标志位,如果为false,则表示当前协程未尝试设置mutexWoken标志位,因此可以继续执行后续的逻辑,注意是当前本身这个协程;然后判断old&mutexWoken == 0mutexWoken 是一个标志位,表示是否有协程正在自旋尝试获取锁;old>>mutexWaiterShift != 0将锁的状态值右移 mutexWaiterShift 位,得到等待队列中协程的数量,表示等待队列中至少有一个协程正在等待锁。atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)这是一个原子操作,用于将锁的状态从 old 更新为 old|mutexWoken

随后调用 runtime_doSpin 函数执行自旋操作,这是一种轻量级的忙等待,告知调度器p目前处于自旋模式,目的是快速获取锁,避免立即进入阻塞状态。自旋次数通过 iter 计数器递增记录,每次自旋后更新锁的状态 old,以便在下一次循环中重新检查锁的状态。

如果自旋成功获取锁,协程将继续执行;如果自旋失败,代码将继续循环,尝试再次自旋或进入其他逻辑分支。这段代码的核心目的是在锁竞争不激烈的情况下,通过自旋快速获取锁,从而减少上下文切换的开销,提高锁的获取效率。


state新值构造

func (m *Mutex) lockSlow() {// ...for {// 自旋抢锁失败后处理 ...new := oldif old&mutexStarving == 0 {new |= mutexLocked}if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift}if starving && old&mutexLocked != 0 {new |= mutexStarving}if awoke {new &^= mutexWoken}// ...}
}

先通过构造一个new,对状态值进行一个预写,后续进行一个CAS操作,把new写入state。 从自旋中走出来后,会存在两种分支,要么加锁成功,要么陷入自锁,不论是何种情形,都会先对 sync.Mutex 的状态新值 new 进行更新。

能够到达new,意味着上面的if条件没有命中,也就是会存在几种情况,第一个就是锁还没有被占有,锁是自由的;第二种是锁处于饥饿模式了,第三种是不满足自旋的条件(三种)。

接下来详细看看这个代码:

if old&mutexStarving == 0 {new |= mutexLocked
}

首先如果当前锁的状态中没有设置 mutexStarving(饥饿模式)标志位,说明锁没有进入饥饿模式,那么将 mutexLocked(锁已占用)标志位设置为 1。这表示当前协程尝试获取锁,并将其标记为已占用,这个过程就是“抢锁”的过程,也就是理解为抢锁的过程,后续再check一下就可以。所以说这个加锁的动作不一定是百分百成立的。

简单回顾下饥饿模式,怕看到这总是容易往上翻饥饿模式的一个概念。

饥饿模式(公平模式):当Mutex阻塞队列中存在饥饿态(长时间取不到这个锁,因为非公平机制)的goroutine时,会进入饥饿模式,将抢锁流程由非公平机制转换为公平机制。在这种情况下,锁的所有权按照阻塞队列的顺序依次进行传递,新的goroutine进行流程的时候不得先抢锁,而是进入队尾排队。

if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift
}

如果当前锁的状态中设置了 mutexLockedmutexStarving 标志位,说明锁已经被占用或者处于饥饿模式。在这种情况下,将等待队列中的协程计数加 1。这是通过将 1 左移 mutexWaiterShift 位(等待队列计数的偏移量)并加到 new 上来实现的,也就是把阻塞的goroutine数量+1即可,因为此时锁已经被占有了,并且我已经走出了自旋,说明没有自旋的条件,一定要被阻塞挂起的。又或者如果是饥饿模式了,那么我们作为一个新的goroutine,就需要执行公平模式,只能阻塞挂起,因为饥饿模式是绝对公平的。

if starving && old&mutexLocked != 0 {new |= mutexStarving
}

注意这里的starving是一个局部变量,而不是state里面的starving,后边会有对这个starving的一个操作,来开启饥饿模式。如果当前协程决定将锁切换到饥饿模式(starving 为 true),并且锁当前是被占用的(old&mutexLocked != 0),则将 mutexStarving 标志位设置为 1。这表示当前协程希望将锁切换到饥饿模式,以便后续的协程能够按照先进先出(FIFO)的顺序获取锁。需要注意的是,如果锁当前是未被占用的,则不会切换到饥饿模式,因为解锁操作期望饥饿模式下有等待的协程,否则会导致不一致的状态。

if awoke {if new&mutexWoken == 0 {throw("sync: inconsistent mutex state")}new &^= mutexWoken
}

如果当前协程awoketrue,则需要重置 mutexWoken 标志位。首先,检查 new 中是否设置了 mutexWoken 标志位,如果没有设置,则抛出异常,因为这表示锁的状态不一致。如果设置正确,则通过 new &^= mutexWokenmutexWoken 标志位清零,表示当前协程已经完成了唤醒过程,因为唤醒就是为了获得这个锁,所以也就可以理解为不再需要这个标志位了,因此需要重置。

也就是可以理解为:倘若局部变量标识是已有唤醒协程抢锁,说明 Mutex.state 中的 mutexWoken 是被当前 gouroutine 置为 1 的,但由于当前 goroutine 接下来要么抢锁成功,要么被阻塞挂起,因此需要在新值中将该 mutexWoken 标识更新置 0。

mutexWoken 是一个标志位,存储在 Mutex.state 中,用于表示是否有协程正在自旋尝试获取锁。当一个协程被唤醒时,它可能会尝试通过自旋来获取锁,并设置 mutexWoken 标志位。然而,一旦这个协程成功获取锁或者进入阻塞状态,它需要清除 mutexWoken 标志位,以避免误导后续的 Unlock 操作。

awoke是一个布尔变量,用于标记当前协程是否已经被唤醒。当一个协程因为锁的释放而被唤醒时,awoke会被设置为true。此时,协程可能会尝试通过自旋获取锁。

mutexWoken是一个标志位,存储在锁的状态变量state中。它的主要作用是通知锁的释放者(即调用Unlock的协程)当前是否有协程正在自旋尝试获取锁。如果一个协程正在自旋尝试获取锁,它会通过原子操作设置mutexWoken标志位。当锁被释放时,Unlock方法会检查mutexWoken标志位。如果该标志位被设置,说明有协程正在自旋,因此Unlock不会唤醒等待队列中的其他协程,以避免不必要的上下文切换。如果mutexWoken未被设置,Unlock会唤醒等待队列中的下一个协程。

新旧值替换

接下来就是通过CAS操作,用上面我们构造的新的if来替换旧的值。

如果失败,即旧值被其他协程介入或者提前修改导致不符合预期,则将旧值更新为此刻的Mutex.State,并开启新的循环。

如果CAS替换旧值为新值替换成功,则进入最后一轮的二选一(因为前边我们进行了if old&mutexStarving进行mutexLocked置为1的操作,还要额外判断是否成功了。)如果当前goroutine加锁成功,则返回,如果失败,则将goroutine挂起添加到阻塞队列进行下一步的操作。

在这里插入图片描述

上锁成功
func (m *Mutex) lockSlow() {// ...for {// 自旋抢锁失败后处理 ...// new old 状态值更新 ...if atomic.CompareAndSwapInt32(&m.state, old, new) {// 额外校验一次是否加锁成功if old&(mutexLocked|mutexStarving) == 0 {break // 这是加锁成功的唯一出口}// ...} // ...}
}
  0000 (old)
& 0101 (mutexLocked | mutexStarving)----0000 (结果)

若旧值,注意是旧值,是未加锁状态且为正常模式,则意味说明加锁成功,返回即可。

这是正常模式下的一个正常加锁出口

旧值中锁未释放或者处于饥饿模式,则当前 goroutine 需要进入阻塞队列挂起。

阻塞挂起
func (m *Mutex) lockSlow() {// ...for {// 自旋抢锁失败后处理 ...// new old 状态值更新 ...if atomic.CompareAndSwapInt32(&m.state, old, new) {// 加锁成功后返回的逻辑分支 ...queueLifo := waitStartTime != 0if waitStartTime == 0 {waitStartTime = runtime_nanotime()}runtime_SemacquireMutex(&m.sema, queueLifo, 1)// ...} // ...}
}

如果锁已经被占用或处于饥饿模式,代码会进入阻塞逻辑。首先,检查是否已经等待过锁(waitStartTime != 0)。如果没有等待过,记录当前时间到 waitStartTime,以便后续计算等待时间。然后,调用 runtime_SemacquireMutex 阻塞当前协程,直到锁被释放,如下图所示。

在这里插入图片描述
在阻塞后,检查当前协程是否等待了足够长的时间,从而进入饥饿模式(starving)。如果等待时间超过了阈值(starvationThresholdNs),将 starving 设置为 true,注意这个是局部状态的starving,而不是state里面的饥饿状态。这里就跟前面的if判断对应上了,也就是为什么会突然判断一个starving && old&mutexLocked!=0 的那个判断if。

接下来,重新读取锁的状态 old,也就是苏醒之前的状态old,检查是否处于饥饿模式(old & mutexStarving != 0)。如果处于饥饿模式,并且当前协程被唤醒,说明锁的所有权已经传递给了当前协程,但锁的状态可能不一致(mutexLocked 未设置,且当前协程仍被计为等待者)。在这种情况下,需要修复锁的状态:如果 old & (mutexLocked | mutexWoken) != 0 或者没有等待者(old >> mutexWaiterShift == 0),抛出异常,因为这是不一致的状态。

但是如果没有异常,那么因为是饥饿模式,并且我是被唤醒的,说明一定会把锁的所有权给我。

这里就是第二个加锁成功的出口,这是饥饿模式下的一个加锁成功的出口

delta 被初始化为 mutexLocked - 1<<mutexWaiterShift,这表示锁的状态变化量。mutexLocked 是锁的占用标志位,1<<mutexWaiterShift 表示等待队列中协程数量的变化。这个变化量用于后续更新锁的状态。

接下来,代码检查是否需要退出饥饿模式。如果当前协程不是饥饿状态(!starving)注意这里starving是或运算,只要之前的starving = starving || runtime....>1ms 这个代码置为了true,之后永远不会是false了,所以导致饥饿的G永远不会取消饥饿starving,只会是其他被唤醒的G才可能取消饥饿starving;或者等待队列中只有一个协程(old>>mutexWaiterShift == 1),则决定退出饥饿模式。

在这里插入图片描述
如果醒来不是饥饿模式,而是正常模式(要跟新进来的goroutine进行抢锁),则说明还需要进行下一轮循环,需要再抢一次锁,那么我们只需要如果当前协程被唤醒(awoke = true),重置自旋计数器 iter,以便在下一次尝试获取锁时重新开始自旋过程,也就是重新经历4轮CAS的过程。

sync.Unlock()方法

解锁主干方法

通过原子操作进行解锁,如果只有一个goroutine,那么直接返回即可,如果还有协程阻塞中,那么就需要进入unlockSlow的分支。

在这里插入图片描述

unlockSlow的三种情况

在这里插入图片描述

异常情况
	if (new+mutexLocked)&mutexLocked == 0 {fatal("sync: unlock of unlocked mutex")}

首先,代码检查当前锁的状态 new,确保锁确实处于加锁状态。这是通过检查 (new + mutexLocked) & mutexLocked == 0 来实现的。如果这个条件成立,说明锁未被占用,尝试解锁一个未加锁的互斥锁是非法的,因此调用 fatal 函数抛出错误。

正常模式
	if new&mutexStarving == 0 {old := newfor {// If there are no waiters or a goroutine has already// been woken or grabbed the lock, no need to wake anyone.// In starvation mode ownership is directly handed off from unlocking// goroutine to the next waiter. We are not part of this chain,// since we did not observe mutexStarving when we unlocked the mutex above.// So get off the way.if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}// Grab the right to wake someone.new = (old - 1<<mutexWaiterShift) | mutexWokenif atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false, 1)return}old = m.state}

如果锁不在饥饿模式下(new & mutexStarving == 0),代码进入以下逻辑:

首先将当前锁的状态存储在变量 old 中。

如果没有等待者(old >> mutexWaiterShift == 0),或者锁已经被占用(意味着我解锁之后又有新的goroutine加锁成功了,那我们就没必要继续管后续操作了,由新的占有锁的协程继续管理这个操作即可)、已经唤醒了某个协程(此时还有在尝试获取锁的goroutine存在,所以这样也没有必要去唤醒阻塞队列中的某个goroutine,因为已经有人在竞争这个锁了)、或者处于饥饿模式(意味这个锁新变为了饥饿模式)(old & (mutexLocked | mutexWoken | mutexStarving) != 0),则直接返回,无需唤醒其他协程。

否则,尝试通过原子操作 atomic.CompareAndSwapInt32 更新锁的状态,将等待队列中的协程数量减一,并设置 mutexWoken 标志位,表示已经唤醒了某个协程。

如果原子操作成功,调用 runtime_Semrelease 唤醒等待队列中的一个协程,然后返回。如果原子操作失败,重新读取锁的当前状态,并继续循环。

饥饿模式
 else {// Starving mode: handoff mutex ownership to the next waiter, and yield// our time slice so that the next waiter can start to run immediately.// Note: mutexLocked is not set, the waiter will set it after wakeup.// But mutex is still considered locked if mutexStarving is set,// so new coming goroutines won't acquire it.runtime_Semrelease(&m.sema, true, 1)}

如果锁处于饥饿模式(new & mutexStarving != 0),代码进入以下逻辑:

直接移交锁的所有权:在饥饿模式下,锁的所有权直接从当前协程移交到等待队列中的下一个协程。

调用 runtime_Semrelease:调用 runtime_Semrelease 唤醒等待队列中的下一个协程,并且设置 true 参数,表示当前协程应该放弃时间片,让下一个协程立即运行。

值得注意的是,这是一个很巧的点,在饥饿模式下,mutexLocked 标志位不会被设置,因为下一个协程将在唤醒后自己设置该标志位。但是,锁仍然被认为是被占用的,因为 mutexStarving 标志位被设置了,这会阻止新来的协程获取锁。 只能说设计的思想太巧妙了。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词