前言
golang 的sync包下有种锁,一种是sync.RWMutex,另一种是sync.Mutex,本文将讲解下sync.Mutex是如何实现的?如何避免读/写 饥饿问题?就让我们带着这些问题来看源码是如何实现的
概述
// Mutex fairness. // // Mutex can be in 2 modes of operations: normal and starvation. // In normal mode waiters are queued in FIFO order, but a woken up waiter // does not own the mutex and competes with new arriving goroutines over // the ownership. New arriving goroutines have an advantage -- they are // already running on CPU and there can be lots of them, so a woken up // waiter has good chances of losing. In such case it is queued at front // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, // it switches mutex to the starvation mode. // // In starvation mode ownership of the mutex is directly handed off from // the unlocking goroutine to the waiter at the front of the queue. // New arriving goroutines don't try to acquire the mutex even if it appears // to be unlocked, and don't try to spin. Instead they queue themselves at // the tail of the wait queue. // // If a waiter receives ownership of the mutex and sees that either // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, // it switches mutex back to normal operation mode. // // Normal mode has considerably better performance as a goroutine can acquire // a mutex several times in a row even if there are blocked waiters. // Starvation mode is important to prevent pathological cases of tail latency.
以上摘自golang源码中对mutex的注释,我觉得用来概括解释非常清晰
Mutex作为并发原语中的锁,涉及锁的公平性(即公平锁和非公平锁,通常非公平锁性能更加),go中叫做两种模式:正常和饥饿。
正常模式下,未获取到锁的goroutine 会在waiter中按照FIFO 方式在队列中排队。当锁被释放,会唤醒waiter 中的goroutine,它会和新来的goroutine(如果释放锁时,刚好有新的协程来获取锁)进行竞争锁,新来的goroutine有更大的优势获取到锁,因为他们正在CPU执行。那么刚刚在waiter中唤醒的goroutine 由于没有获取到锁(白跑一趟),那么它就会被放到waiter的队列头.当waiter 中的goroutine 超过1s 没有获取到锁,会将mutex 置为饥饿模式。
饥饿模式下, 在释放锁的过程,新来的goroutine不会参与竞争锁,直接由waiter 中队头的goroutine获取锁,如果队头的goroutine 的等待时间小于1ms,说明此时已经没有协程处于饥饿,将切换回正常模式。
源码
const(
  mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
)
type Mutex struct {
   state int32
   sema  uint32
}
state状态中低三位用于标识锁的状态其他高位用于记录waiter的数量, state 可以表示为:waiterNum|mutexStarving|mutexWoken|mutexLocked 
sema 是个FIFO队列,用于goroutine 作为waiter 在这里排队.
获取锁
没有竞争,直接通过CAS获取锁
func (m *Mutex) Lock() {
   // Fast path: grab unlocked mutex.
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      if race.Enabled {
         race.Acquire(unsafe.Pointer(m))
      }
      return
   }
   // Slow path (outlined so that the fast path can be inlined)
   m.lockSlow()
}
有竞争,走lockSlow
正常模式
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   // Active spinning makes sense.
   // Try to set mutexWoken flag to inform Unlock
   // to not wake other blocked goroutines.
   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
      atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
      awoke = true
   }
   runtime_doSpin()
   iter++
   old = m.state
   continue
}
正常模式下,这里通过自旋等待锁的释放,同时会将state 置为mutexWoken,用于锁在释放是能否将锁资源移交给自旋锁的协程竞争锁
if atomic.CompareAndSwapInt32(&m.state, old, new) {
   if old&(mutexLocked|mutexStarving) == 0 {
     // 这里是正常模式下,线程唤醒后获取到锁的出口
      break // locked the mutex with CAS //线程自旋后,原来持有锁的线程释放锁后,state的mutexLocked 或置于0。然后,本次CAS 成功,获取到锁
   }
   // If we were already waiting before, queue at the front of the queue.  //没有获取到锁,若是之前已经在waiter中,则放入队首,否则放入队尾
   queueLifo := waitStartTime != 0
   if waitStartTime == 0 {
      waitStartTime = runtime_nanotime()
   }
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)  //每次有锁释放,会唤醒waiter 协程,唤醒点在这里
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
   old = m.state
    ...
   awoke = true
   iter = 0
} 
- 线程自旋后,原来持有锁的线程释放锁后,state的mutexLocked 或置于0。然后,本次CAS 成功,获取到锁
 - 若自己是waiter唤醒后,但是由没有获取到锁,则放入waiter 队首,否则放入队尾
 - 若等待时间超过1s ,将mutex 切换为饥饿模式
 
饥饿模式
new := old
...
if atomic.CompareAndSwapInt32(&m.state, old, new) {
   ...
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
   old = m.state
   if old&mutexStarving != 0 {
      ...
      delta := int32(mutexLocked - 1<<mutexWaiterShift)
      if !starving || old>>mutexWaiterShift == 1 { 
         // Exit starvation mode.
         // Critical to do it here and consider wait time.
         // Starvation mode is so inefficient, that two goroutines
         // can go lock-step infinitely once they switch mutex
         // to starvation mode.
         delta -= mutexStarving
      }
      atomic.AddInt32(&m.state, delta)
      break  //这里是锁在饥饿条件下,协程被唤醒后(获取到锁)的出口
   }
   awoke = true
   iter = 0
} else {
   old = m.state
}
- 若协程已经超过1ms 没有获取到锁,则切换到饥饿模式(
runtime_nanotime()-waitStartTime > starvationThresholdNs). - 若waiter 队列只剩本协程,那么退出饥饿模式(
old>>mutexWaiterShift == 1) 
释放锁
没有锁竞争,直接CAS 释放锁资源
func (m *Mutex) Unlock() {
   // Fast path: drop lock bit.
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 {
      // Outlined slow path to allow inlining the fast path.
      // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
      m.unlockSlow(new)
   }
}
有竞争,走unlockSlow
正常模式
old := new
for {
   // If there are no waiters or a goroutine has already
   // been woken or grabbed the lock, no need to wake anyone.
   // In starvation mode ownership is directly handed off from unlocking
   // goroutine to the next waiter. We are not part of this chain,
   // since we did not observe mutexStarving when we unlocked the mutex above.
   // So get off the way.
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
      return
   }
   // Grab the right to wake someone.
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
      runtime_Semrelease(&m.sema, false, 1)
      return
   }
   old = m.state
}
从 waiter中唤起一个,与新来的goroutine 一起竞争锁资源
饥饿模式
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
runtime_Semrelease(&m.sema, true, 1)
直接从waiter中取出等待队列的第一个饥饿的协程来获取锁
参考文献
- https://juejin.cn/post/695897...
 - https://segmentfault.com/a/11...
 
