linux mutex 互斥锁实现
自旋锁是Linux内核使用最广泛的一种锁机制(在笔者《linux spinlock自旋锁实现》一文中有详细描述),在多处理器和NUMA系统中,排队自旋锁仍然存在一个比较严重的问题。假设在一个锁争用激烈的系统中,所有自旋等待锁的线程都在一个共享全局变量上自旋,申请和释放锁都在同一个变量上修改,由cache一致性原理导致参与自旋的CPU中的cache line变得无效。在锁争用过程中,导致严重的CPU高速缓存行颠簸现象(CPU cahce line bouncing),即多个CPU上的cache line反复失效,大大降低了系统整体性能。
MCS锁是一种自旋锁的优化方案,它是由两个发明者Mellor-Crummey和Scott的名字来命名的。MCS算法可以解决自旋锁遇到的问题,显著减少CPU cachelien bouncing问题。MCS算法的核心思想是每个锁的申请者只在本地CPU的变量上自旋,而不是全局的变量。虽然MCS算法的设计是针对自旋锁的,但是目前Linux4.0内核中依然没有把MCS算法用在自旋锁上,但是在互斥锁上使用了MCS算法。
1、数据结构
下面看看mutex数据结构定义:
include/linux/mutex.h
/** Simple, straightforward mutexes with strict semantics(语义):** - only one task(任务) can hold the mutex at a time* - only the owner can unlock the mutex(仅锁持有者可以释放互斥锁)* - multiple unlocks are not permitted (不允许多次释放锁)* - recursive locking is not permitted (不允许递归持有锁)* - a mutex object must be initialized via the API* - a mutex object must not be initialized via memset or copying* - task may not exit with mutex held(持有锁时任务不能退出)* - memory areas where held locks reside must not be freed* - held mutexes must not be reinitialized(不能重复初始化)* - mutexes may not be used in hardware or software interrupt* contexts such as tasklets and timers(互斥锁不能用于软/硬中断上下文)** These semantics are fully enforced when DEBUG_MUTEXES is* enabled. Furthermore, besides enforcing the above rules, the mutex* debugging code also implements a number of additional features* that make lock debugging easier and faster:** - uses symbolic names of mutexes, whenever they are printed in debug output* - point-of-acquire tracking, symbolic lookup of function names* - list of all locks held in the system, printout of them* - owner tracking* - detects self-recursing locks and prints out all relevant info* - detects multi-task circular deadlocks and prints out all affected* locks and tasks (and only those tasks)*/
struct mutex {/*原子计数,1:表示没有人持有锁;0:表示锁被持有;负数表示锁被持有且有人在等待队列中等待。1: unlocked, 0: locked, negative: locked, possible waiters*/atomic_t count;/*用于保护wait_list睡眠等待队列*/spinlock_t wait_lock;/*管理所有在锁上睡眠的进程,没有成功获取锁的进程会睡眠在链表上*/struct list_head wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)/*CONFIG_MUTEX_SPIN_ON_OWNER=y 成立 *//*指向锁持有者的task_struct数据结构*/struct task_struct *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER/*CONFIG_MUTEX_SPIN_ON_OWNER=y 成立*//*MCS锁队列,用于实现MCS锁机制,OSQ是MCS锁机制的一个具体实现。optimistic乐观的*/struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXESvoid *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOCstruct lockdep_map dep_map;
#endif
};/** This is the control structure for tasks blocked on mutex,* which resides on the blocked task's kernel stack:*/
struct mutex_waiter {struct list_head list;struct task_struct *task;
#ifdef CONFIG_DEBUG_MUTEXESvoid *magic;
#endif
};
2、MCS算法
linux内核版本的MCS锁最早由社区专家Waiman Long在linux3.10中实现,后来经过社区其他专家不断优化称为现在的osq_lock,可以说OSQ锁是MCS锁机制的一个具体实现。MCS锁本质上是一种基于链表结构的自旋锁,OSQ锁的实现需要两个数据结构:
#define OSQ_UNLOCKED_VAL (0)
#define OSQ_LOCK_UNLOCKED { ATOMIC_INIT(OSQ_UNLOCKED_VAL) }struct optimistic_spin_queue {/** Stores an encoded value of the CPU # of the tail node in the queue.* If the queue is empty, then it's set to OSQ_UNLOCKED_VAL.*//*指向队列中最后一个node的cpu编号,如果队列为空其值为OSQ_UNLOCKED_VAL*/atomic_t tail;
};/** An MCS like lock especially tailored(专门的) for optimistic spinning for sleeping* lock implementations (mutex, rwsem, etc).* 一种类似MCS的锁,专为乐观自旋睡眠而量身定制*/
struct optimistic_spin_node {struct optimistic_spin_node *next, *prev;int locked; /* 1 if lock acquired */int cpu; /* encoded CPU # + 1 value */
};/** An MCS like lock especially tailored for optimistic spinning for sleeping* lock implementations (mutex, rwsem, etc).** Using a single mcs node per CPU is safe because sleeping locks should not be* called from interrupt context and we have preemption disabled while* spinning.*/
/*per-cpu变量osq_node*/
static DEFINE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node, osq_node);
MCS锁队列(optimistic_spin_queue)在osq_lock_init中初始化,mutex初始化时调用osq_lock_init初始化MCS锁队列。
/*** mutex_init - initialize the mutex* @mutex: the mutex to be initialized** Initialize the mutex to unlocked state.** It is not allowed to initialize an already locked mutex.*/
# define mutex_init(mutex) \
do { \static struct lock_class_key __key; \\__mutex_init((mutex), #mutex, &__key); \
} while (0)void __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{atomic_set(&lock->count, 1); /*1:unlocked*/spin_lock_init(&lock->wait_lock);INIT_LIST_HEAD(&lock->wait_list);mutex_clear_owner(lock); /*lock->owner=NULL*/
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER/*初始化MCS锁*/osq_lock_init(&lock->osq); /*atomic_set(&lock->tail,0);*/
#endif/*空操作*/debug_mutex_init(lock, name, key);
}static inline void osq_lock_init(struct optimistic_spin_queue *lock)
{atomic_set(&lock->tail, OSQ_UNLOCKED_VAL);
}
osq_lock用以申请MCS锁:
bool osq_lock(struct optimistic_spin_queue *lock)
{/*node指向当前cpu的struct optimistic_spin_node节点*/struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);struct optimistic_spin_node *prev, *next;/*cpu编号*/int curr = encode_cpu(smp_processor_id());int old;/*设置node->locked为0*/node->locked = 0;node->next = NULL;node->cpu = curr;/*交换lock->tail和当前cpu编号(curr),返回lock->tail旧值,lock->tail记录等待队列中最后一个node所在CPU编号*/old = atomic_xchg(&lock->tail, curr);/*如果lock->tail旧值为OSQ_UNLOCKED_VAL(0),说明还没有人持有锁,交换后lock->tail为当前cpu编号(curr)表示当前CPU持锁成功;*/if (old == OSQ_UNLOCKED_VAL)return true;/*当前等待队列中末尾node所在cpu的struct optimistic_spin_node节点*/prev = decode_cpu(old);/*将node插入到prev后面排队等待*/node->prev = prev;ACCESS_ONCE(prev->next) = node;/** Normally @prev is untouchable after the above store; because at that* moment unlock can proceed and wipe the node element from stack.** However, since our nodes are static per-cpu storage, we're* guaranteed their existence -- this allows us to apply* cmpxchg in an attempt to undo our queueing.*//*查询当前节点node->locked是否变为1,因为前继节点prev释放锁时会把它后继节点的locked设置为1,然后才能成功释放锁。在理想情况下,前节点释放锁,后继节点获取锁退出自旋,返回true。*/while (!ACCESS_ONCE(node->locked)) { /*在本地cpu上的node->locked上自旋*//** If we need to reschedule bail... so we can block.*//*在自旋等待过程中,如果有更高优先级进程抢占或者被调度器要求调度出去,那么应该放弃自旋等待,退出MCS链表,跳转到unqueue处处理MCS链表删除节点的情况。检查需要调度时将node从MCS链表上删除非常重要,这样就保证了同一个node节点不可能出现在多个MCS链表上。因为node为per-cpu变量,同一个node在多个MCS链表上的话,其中任何一个MCS链表上node的locked成员值的修改(0->1)对其他MCS链表都是灾难性的影响。*/if (need_resched())goto unqueue;cpu_relax_lowlatency();/*barrier()*/}return true;unqueue:/*删除MCS链表节点分为如下3个步骤:(a) 解除前继节点(prev)的next指针的指向。(b) 解除当前节点(node)的next指针的指向,并且找出当前节点下一个确定的节点next。(c) 让前继节点prev->next指向next,next->prev指向prev。*//** Step - A -- stabilize @prev** Undo our @prev->next assignment; this will make @prev's* unlock()/unqueue() wait for a next pointer since @lock points to us* (or later).*/for (;;) {if (prev->next == node &&cmpxchg(&prev->next, node, NULL) == node)break;/** We can only fail the cmpxchg() racing against an unlock(),* in which case we should observe @node->locked becomming* true.*//*上面过程中存在osq_unlock操作,则prev->next则为NULL,检查下node->locked是否变为true*/if (smp_load_acquire(&node->locked))return true;cpu_relax_lowlatency();/*barrier()*//** Or we race against a concurrent unqueue()'s step-B, in which* case its step-C will write us a new @node->prev pointer.*//*重新获取node->prev*/prev = ACCESS_ONCE(node->prev);}/** Step - B -- stabilize @next** Similar to unlock(), wait for @node->next or move @lock from @node* back to @prev.*//*unlock或则unqueue时意味着node节点要从队列中移除,需要获取node->next指针*/next = osq_wait_next(lock, node, prev);if (!next)return false;/** Step - C -- unlink** @prev is stable because its still waiting for a new @prev->next* pointer, @next is stable because our @node->next pointer is NULL and* it will wait in Step-A.*/ACCESS_ONCE(next->prev) = prev;ACCESS_ONCE(prev->next) = next;/*返回false*/return false;
}
/** We use the value 0 to represent "no CPU", thus the encoded value* will be the CPU number incremented by 1.*/
static inline int encode_cpu(int cpu_nr)
{/*0表示没有cpu,1表示cpu0,以此类推*/return cpu_nr + 1;
}static inline struct optimistic_spin_node *decode_cpu(int encoded_cpu_val)
{int cpu_nr = encoded_cpu_val - 1;/*返回cpu_nr对应的per-cpu变量osq_node*/return per_cpu_ptr(&osq_node, cpu_nr);
}/** Get a stable @node->next pointer, either for unlock() or unqueue() purposes.* Can return NULL in case we were the last queued and we updated @lock instead.*/
/*unlock或则unqueue时意味着node节点要从队列中移除,需要获取node->next指针*/
static inline struct optimistic_spin_node * osq_wait_next(struct optimistic_spin_queue *lock,struct optimistic_spin_node *node,struct optimistic_spin_node *prev)
{struct optimistic_spin_node *next = NULL;/*当前cpu编号*/int curr = encode_cpu(smp_processor_id());int old;/** If there is a prev node in queue, then the 'old' value will be* the prev node's CPU #, else it's set to OSQ_UNLOCKED_VAL since if* we're currently last in queue, then the queue will then become empty.*/old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;for (;;) {if (atomic_read(&lock->tail) == curr /*当前cpu对应node为队列中最后一个*/ &&atomic_cmpxchg(&lock->tail, curr, old) == curr) {/** We were the last queued, we moved @lock back. @prev* will now observe @lock and will complete its* unlock()/unqueue().*/break;}/** We must xchg() the @node->next value, because if we were to* leave it in, a concurrent unlock()/unqueue() from* @node->next might complete Step-A and think its @prev is* still valid.** If the concurrent unlock()/unqueue() wins the race, we'll* wait for either @lock to point to us, through its Step-B, or* wait for a new @node->next from its Step-C.*/if (node->next) {/*返回node->next并设置node->next为NULL*/next = xchg(&node->next, NULL);if (next)break;}cpu_relax_lowlatency();}/*返回next*/return next;
}/*释放MCS锁*/
void osq_unlock(struct optimistic_spin_queue *lock)
{struct optimistic_spin_node *node, *next;/*当前cpu的编号*/int curr = encode_cpu(smp_processor_id());/** Fast path for the uncontended case.*//*lock->tail指向curr,说明curr是队列中最后一个,后面无排队等待的node,则设置lock->tail为OSQ_UNLOCKED_VAL,快速释放*/if (likely(atomic_cmpxchg(&lock->tail, curr, OSQ_UNLOCKED_VAL) == curr))return;/*等待队列中存在其他等待的node,则需要将锁传递给后继节点*//** Second most likely case.*//*当前CPU的struct optimistic_spin_node节点*/node = this_cpu_ptr(&osq_node);/*返回node->next,并将node->next设置为NULL*/next = xchg(&node->next, NULL);if (next) {/*将next->locked设置为1,把锁传递给next*/ACCESS_ONCE(next->locked) = 1;return;}/*next不存在,则尝试重新获取node->next*/next = osq_wait_next(lock, node, NULL);if (next)ACCESS_ONCE(next->locked) = 1;
}#define cpu_relax_lowlatency() cpu_relax()
#define cpu_relax() barrier()
3、mutex锁的实现
mutex锁的初始化有两种方式,一种是静态使用DEFINE_MUTEX宏,另一种是内核代码中动态使用mutex_init接口.
#define DEFINE_MUTEX(mutexname) \struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)#define __MUTEX_INITIALIZER(lockname) \{ .count = ATOMIC_INIT(1) \, .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock) \, .wait_list = LIST_HEAD_INIT(lockname.wait_list) \__DEBUG_MUTEX_INITIALIZER(lockname) \__DEP_MAP_MUTEX_INITIALIZER(lockname) }void mutex_lock(struct mutex *lock);int __must_check mutex_lock_interruptible(struct mutex *lock);int __must_check mutex_lock_killable(struct mutex *lock);int mutex_trylock(struct mutex *lock);void mutex_unlock(struct mutex *lock);
下面看看mutex_lock的实现:
/*** mutex_lock - acquire the mutex* @lock: the mutex to be acquired** Lock the mutex exclusively for this task. If the mutex is not* available right now, it will sleep until it can get it.** The mutex must later on be released by the same task(同一个任务) that acquired it.* Recursive locking is not allowed.(递归持锁不被允许)* The task may not exit without first unlocking the mutex.(如果不释放互斥锁,任务是无法退出的)* Also, kernel memory where the mutex resides must not be freed with* the mutex still locked. The mutex must first be initialized* (or statically defined) before it can be locked. memset()-ing* the mutex to 0 is not allowed.** ( The CONFIG_DEBUG_MUTEXES .config option turns on debugging* checks that will enforce the restrictions and will also do* deadlock debugging. )** This function is similar to (but not equivalent to) down().*/
/*持有互斥锁;如果lock->count为1(unlock)则直接持互斥锁(lock->count=0 && lock->owner = current);
*/
void __sched mutex_lock(struct mutex *lock)
{might_sleep();/** The locking fastpath is the 1->0 transition from* 'unlocked' into 'locked' state.*//*原子地设置count为0/-1 (已被持锁,locked),返回其旧值。如果count旧值为1,表明锁当前为unlock状态则可以直接获取锁(已经设置count为0);如果count旧值不为1,则执行slowpath_lock;*/__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);/*lock->owner = current*/mutex_set_owner(lock);
}/*mutex lock慢路径*/
__visible void __sched __mutex_lock_slowpath(atomic_t *lock_count)
{struct mutex *lock = container_of(lock_count, struct mutex, count);/*TASK_UNINTERRUPTIBLE:不可中断休眠状态*/__mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,NULL, _RET_IP_/*返回调用位置的地址*/, NULL, 0);
}/** Lock a mutex (possibly interruptible), slowpath:*/
/**/
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,struct lockdep_map *nest_lock, unsigned long ip,struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{/*当前任务*/struct task_struct *task = current;struct mutex_waiter waiter;unsigned long flags;int ret;/*关闭内核抢占*/preempt_disable();mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);/*获取互斥锁成功返回true,否则返回false(1.锁持有者进入休眠;2.需要被调度,再次执行时表明已经完成调度且再次获得执行时间);*/if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {/* got the lock, yay! *//*获取锁成功*/preempt_enable(); /*开启抢占*/return 0;}/*获取互斥锁失败处理流程*/spin_lock_mutex(&lock->wait_lock, flags); /*spin_lock*//** Once more, try to acquire the lock. Only try-lock the mutex if* it is unlocked to reduce unnecessary xchg() operations.*//*锁未被持有,则尝试持有锁*/if (!mutex_is_locked(lock) && (atomic_xchg(&lock->count, 0) == 1))goto skip_wait;/*空操作*/debug_mutex_lock_common(lock, &waiter);debug_mutex_add_waiter(lock, &waiter, task_thread_info(task));/* add waiting tasks to the end of the waitqueue (FIFO): *//*将任务挂入wait_list*/list_add_tail(&waiter.list, &lock->wait_list);waiter.task = task;lock_contended(&lock->dep_map, ip);for (;;) {/** Lets try to take the lock again - this is needed even if* we get here for the first time (shortly after failing to* acquire the lock), to make sure that we get a wakeup once* it's unlocked. Later on, if we sleep, this is the* operation that gives us the lock. We xchg it to -1, so* that when we release the lock, we properly wake up the* other waiters. We only attempt the xchg if the count is* non-negative in order to avoid unnecessary xchg operations:*//*1.锁持有者进入休眠期间申请锁的任务以及2.锁申请过程中需要被调度的任务 在全局变量lock->count上进行测试*/if (atomic_read(&lock->count) >= 0 &&(atomic_xchg(&lock->count, -1) == 1))break;/*尝试获取锁失败*//** got a signal? (This code gets eliminated in the* TASK_UNINTERRUPTIBLE case.)*/if (unlikely(signal_pending_state(state, task))) {ret = -EINTR;goto err;}if (use_ww_ctx && ww_ctx->acquired > 0) {ret = __ww_mutex_lock_check_stamp(lock, ww_ctx);if (ret)goto err;}/*这个处理流程和信号量很类似,设置任务状态为可中断(TASK_INTERRUPTIBLE)/不可中断(TASK_UNINTERRUPTIBLE)休眠状态*/__set_task_state(task, state);/* didn't get the lock, go to sleep: *//*spin_unlock内部包含开启内核抢占,即将休眠释放自旋锁*/spin_unlock_mutex(&lock->wait_lock, flags);/*等待任务进行休眠,preempt_disable是在任务醒来后的操作,schedule+preempt_disable*/schedule_preempt_disabled();/*等待任务休眠醒来 spin_lock内部包含关闭内核抢占,持有自旋锁*/spin_lock_mutex(&lock->wait_lock, flags);}/*获取自旋锁成功,设置任务状态为RUNNING*/__set_task_state(task, TASK_RUNNING);/*从链表上删除waiter*/mutex_remove_waiter(lock, &waiter, current_thread_info());/* set it to 0 if there are no waiters left: */if (likely(list_empty(&lock->wait_list)))atomic_set(&lock->count, 0); /*如果等待队列无任务,设置lock->count为0*/debug_mutex_free_waiter(&waiter);skip_wait:/* got the lock - cleanup and rejoice! */lock_acquired(&lock->dep_map, ip);/*lock->owner = current,设置锁拥有者*/mutex_set_owner(lock);if (use_ww_ctx) {struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);ww_mutex_set_context_slowpath(ww, ww_ctx);}/*spin_unlock*/spin_unlock_mutex(&lock->wait_lock, flags);/*允许内核抢占*/preempt_enable();return 0;err:mutex_remove_waiter(lock, &waiter, task_thread_info(task));spin_unlock_mutex(&lock->wait_lock, flags);debug_mutex_free_waiter(&waiter);mutex_release(&lock->dep_map, 1, ip);preempt_enable();return ret;
}/** Optimistic spinning.** We try to spin for acquisition(尝试自旋) when we find that the lock owner* is currently running on a (different) CPU and while we don't* need to reschedule. The rationale(根本原因) is that if the lock owner is* running, it is likely to release the lock soon.* (锁持有者正在其他CPU上运行,那么不久之后其就会释放锁,所以如果申请持锁线程不会被立马调度则尝试自旋).** Since this needs the lock owner, and this mutex implementation* doesn't track the owner atomically in the lock field, we need to* track it non-atomically.** We can't do this for DEBUG_MUTEXES because that relies on wait_lock* to serialize everything.** The mutex spinners are queued up using MCS lock so that only one* spinner can compete for the mutex. However, if mutex spinning isn't* going to happen, there is no point in going through the lock/unlock* overhead.** Returns true when the lock was taken, otherwise false, indicating* that we need to jump to the slowpath and sleep.*/
static bool mutex_optimistic_spin(struct mutex *lock,struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{/*当前任务*/struct task_struct *task = current;/*查看当前任务是否需要尝试自旋等待。mutex_can_spin_on_owner返回0:表示当前任务需要被调度或则锁持有者任务当前在休眠,不符合自旋等待机制的条件。自旋等待的条件是锁持有者在临界区执行,自旋等待才有价值。锁持有者任务和当前任务处于不同的CPU。*/if (!mutex_can_spin_on_owner(lock))goto done;/*当前任务需要自旋等待*//** In order to avoid a stampede(狂奔) of mutex spinners trying to* acquire the mutex all at once, the spinners need to take a* MCS (queued) lock first before spinning on the owner field.*//*申请MCS锁,无法获取MCS锁时自旋等待,返回false表示失败,true为成功;。OSQ锁是自旋锁的一种优化方案,为什么要申请MCS锁?因为接下来要自旋等待互斥锁尽快释放,因此不希望有其他任务参与进来一起自旋等待,多个任务参与自旋等待会导致严重的CPU高速缓存行颠簸(CPU cacheline bouncing)。这里将所有在等待互斥锁的参与者n放入OSQ锁的队列中,只有队列的第一个等待着可以参与自旋等待。*/if (!osq_lock(&lock->osq))goto done;/*获取MCS锁成功,while循环一直自旋并判断锁持有者是否释放了锁,尝试去获取互斥锁*/while (true) {struct task_struct *owner;if (use_ww_ctx && ww_ctx->acquired > 0) {struct ww_mutex *ww;ww = container_of(lock, struct ww_mutex, base);/** If ww->ctx is set the contents are undefined, only* by acquiring wait_lock there is a guarantee that* they are not invalid when reading.** As such, when deadlock detection needs to be* performed the optimistic spinning cannot be done.*/if (ACCESS_ONCE(ww->ctx))break;}/*If there's an owner, wait for it to either release the lock or go to sleep.*//*获取互斥锁当前持有者*/owner = ACCESS_ONCE(lock->owner);/*mutex_spin_on_owner:如果互斥锁持有者正在运行则自旋等待,直到锁持有者释放锁(lock->owner!=owner)或者锁持有者休眠则 返回lock->owner==NULL;锁已被持有且自旋等待一段时间后锁持有者进入休眠则break,跳出while循环不在自旋等待。*/if (owner && !mutex_spin_on_owner(lock, owner))break;/*锁被释放或则自旋等待一段时间后锁被释放*//* Try to acquire the mutex if it is unlocked. *//*锁被释放则尝试获取锁即设置lock->count为0*/if (mutex_try_to_acquire(lock)) {lock_acquired(&lock->dep_map, ip);if (use_ww_ctx) {struct ww_mutex *ww;ww = container_of(lock, struct ww_mutex, base);ww_mutex_set_context_fastpath(ww, ww_ctx);}/*设置当前任务持*/mutex_set_owner(lock); /*lock->owner = current;*//*释放MCS锁*/osq_unlock(&lock->osq);/*返回true*/return true;}/** When there's no owner, we might have preempted between the* owner acquiring the lock and setting the owner field. If* we're an RT task that will live-lock because we won't let* the owner complete.*//*owner为NULL可能是锁持有者在成功获取锁和设置owner的间隙被抢占调度且当前进程为实时进程或者需要被调度则退出自旋等待*/if (!owner && (need_resched() || rt_task(task)))break;/** The cpu_relax() call is a compiler barrier which forces* everything in this loop to be re-loaded. We don't need* memory barriers as we'll eventually observe the right* values at the cost of a few extra spins.*//*barrier(),保证每次while循环时都能重新加载变量的值*/cpu_relax_lowlatency();}/*while循环之外释放MCS锁(1.锁持有者进入休眠;2.需要被调度)*/osq_unlock(&lock->osq);
done:/** If we fell out of the spin path because of need_resched(),* reschedule now, before we try-lock the mutex. This avoids getting* scheduled out right after we obtained the mutex(避免在获取互斥锁后立马被调度出去).*/if (need_resched()) {/** We _should_ have TASK_RUNNING here, but just in case* we do not, make it so, otherwise we might get stuck.*/__set_current_state(TASK_RUNNING);schedule_preempt_disabled(); /*schedule+preempt_disabled*/}/*返回false*/return false;
}/** Initial check for entering the mutex spinning loop查看当前任务是否需要尝试自旋等待*/
static inline int mutex_can_spin_on_owner(struct mutex *lock)
{struct task_struct *owner;int retval = 1; /*默认值为1*//*需要被调度,返回0*/if (need_resched())return 0;rcu_read_lock();/*返回当前持锁任务数据结构,当任务持锁时,lock->owner指向该任务的task_struct数据结构;task_struct->on_cpu为1表示锁持有者正在运行,即在临界区中执行。*/owner = ACCESS_ONCE(lock->owner);if (owner)retval = owner->on_cpu; /*持锁任务当前运行状态,运行还是休眠。检查之后如果持锁任务on_cpu立即变为0,最多浪费点时间自旋*/rcu_read_unlock();/** if lock->owner is not set, the mutex owner may have just acquired* it and not set the owner yet or the mutex has been released.*/return retval;
}/** Look out! "owner" is an entirely speculative pointer* access and not reliable.*/
static noinline int mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner)
{rcu_read_lock();/*owner_running:如果锁持有者释放锁(lock->owner != owner)返回false否则返回owner->on_cpu*/while (owner_running(lock, owner)) {/*互斥锁持有者正在运行*/if (need_resched())break;cpu_relax_lowlatency();}rcu_read_unlock();/*退出while循环的条件:锁持有者释放锁(lock->owner != owner)或者锁持有者休眠*//** We break out the loop above on need_resched() and when the* owner changed, which is a sign for heavy contention. Return* success only when lock->owner is NULL.*/return lock->owner == NULL;
}/*如果lock->owner != owner返回false否则返回owner->on_cpu*/
static inline bool owner_running(struct mutex *lock, struct task_struct *owner)
{if (lock->owner != owner)return false;/** Ensure we emit the owner->on_cpu, dereference _after_ checking* lock->owner still matches owner, if that fails, owner might* point to free()d memory, if it still matches, the rcu_read_lock()* ensures the memory stays valid.*/barrier();return owner->on_cpu;
}include/asm-generic/mutext-xchg.h
/*** __mutex_fastpath_lock - try to take the lock by moving the count from 1 to a 0 value* @count: pointer of type atomic_t* @fail_fn: function to call if the original value was not 1** Change the count from 1 to a value lower than 1, and call <fail_fn> if it* wasn't 1 originally. This function MUST leave the value lower than 1* even when the "1" assertion wasn't true.*/
static inline void __mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{/*原子地设置count为0(已被持锁,locked),返回其旧值。如果count旧值为1,表明锁当前为unlock状态则直接获取锁(已经设置count为0);如果count旧值不为1,则执行slowpath_lock;*/if (unlikely(atomic_xchg(count, 0) != 1))/** We failed to acquire the lock, so mark it contended* to ensure that any waiting tasks are woken up by the* unlock slow path.*//*前面判断可知锁已被其他任务持有,当前任务需要排队等待,设置count值为-1。原子地设置count为-1(锁已经其他任务持有且存在其他任务在排队等待中),返回其旧值。如果count旧值为1,表明锁当前为unlock状态则直接获取锁(已经设置count为-1);如果count旧值不为1,则执行slowpath_lock;*/if (likely(atomic_xchg(count, -1) != 1))fail_fn(count);
}/*** __mutex_fastpath_unlock - try to promote the mutex from 0 to 1* @count: pointer of type atomic_t* @fail_fn: function to call if the original value was not 0** try to promote the mutex from 0 to 1. if it wasn't 0, call <function>* In the failure case, this function is allowed to either set the value to* 1, or to set it to a value lower than one.* If the implementation sets it to a value of lower than one, the* __mutex_slowpath_needs_to_unlock() macro needs to return 1, it needs* to return 0 otherwise.*/
static inline void __mutex_fastpath_unlock(atomic_t *count, void (*fail_fn)(atomic_t *))
{/*原子地设置lock->count为1,如果lock->count旧值为0,无其他任务在申请持锁,直接返回;如果lock->count旧值不为0,有可能存在其他任务还在等待锁,则执行慢路径释放*/if (unlikely(atomic_xchg(count, 1) != 0))fail_fn(count);
}
下面看看mutex_unlock的实现
/*** mutex_unlock - release the mutex* @lock: the mutex to be released** Unlock a mutex that has been locked by this task previously.** This function must not be used in interrupt context. Unlocking* of a not locked mutex is not allowed.** This function is similar to (but not equivalent to) up().*/
/*释放互斥锁,释放过程不涉及MCS锁的操作,即和MCS锁无关*/
void __sched mutex_unlock(struct mutex *lock)
{/** The unlocking fastpath is the 0->1 transition from 'locked'* into 'unlocked' state:*/
#ifndef CONFIG_DEBUG_MUTEXES/** When debugging is enabled we must not clear the owner before time,* the slow path will always be taken, and that clears the owner field* after verifying that it was indeed current.*/mutex_clear_owner(lock); /*lock->owner = NULL;*/
#endif/*原子地设置lock->count为1(空闲),返回其旧值;如果count旧值为0,无其他任务在申请持锁,直接返回;如果count旧值不为0,有可能存在其他任务还在等待锁,则执行慢路径slowpath_lock唤醒下一个等待锁的休眠任务;*/__mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}/** Release the lock, slowpath:*/
__visible void __mutex_unlock_slowpath(atomic_t *lock_count)
{struct mutex *lock = container_of(lock_count, struct mutex, count);__mutex_unlock_common_slowpath(lock, 1);
}/** Release the lock, slowpath:*/
static inline void __mutex_unlock_common_slowpath(struct mutex *lock, int nested)
{unsigned long flags;/** As a performance measurement, release the lock before doing other* wakeup related duties to follow. This allows other tasks to acquire* the lock sooner, while still handling cleanups in past unlock calls.* This can be done as we do not enforce strict equivalence between the* mutex counter and wait_list.*** Some architectures leave the lock unlocked in the fastpath failure* case, others need to leave it locked. In the later case we have to* unlock it here - as the lock counter is currently 0 or negative.*//*处于对性能的考虑,首先释放锁,再唤醒等待队列中的waiter,这样其他任务有机会可以抢占获取锁*/if (__mutex_slowpath_needs_to_unlock())atomic_set(&lock->count, 1);/*spin_lock*/spin_lock_mutex(&lock->wait_lock, flags);mutex_release(&lock->dep_map, nested, _RET_IP_);debug_mutex_unlock(lock);/*lock->wait_list不为空,则唤醒下一个等待锁的休眠任务*/if (!list_empty(&lock->wait_list)) {/* get the first entry from the wait-list: */struct mutex_waiter *waiter =list_entry(lock->wait_list.next,struct mutex_waiter, list);debug_mutex_wake_waiter(lock, waiter);/*唤醒任务*/wake_up_process(waiter->task);}/*spin_unlock*/spin_unlock_mutex(&lock->wait_lock, flags);
}mutex_lock_interruptible实现如下:/*** mutex_lock_interruptible - acquire the mutex, interruptible* @lock: the mutex to be acquired** Lock the mutex like mutex_lock(), and return 0 if the mutex has* been acquired or sleep until the mutex becomes available. If a* signal arrives while waiting for the lock then this function* returns -EINTR.** This function is similar to (but not equivalent to) down_interruptible().*/
int __sched mutex_lock_interruptible(struct mutex *lock)
{int ret;might_sleep();ret = __mutex_fastpath_lock_retval(&lock->count);if (likely(!ret)) {mutex_set_owner(lock);return 0;} elsereturn __mutex_lock_interruptible_slowpath(lock);
}/*** __mutex_fastpath_lock_retval - try to take the lock by moving the count from 1 to a 0 value* @count: pointer of type atomic_t** Change the count from 1 to a value lower than 1. This function returns 0* if the fastpath succeeds, or -1 otherwise.*/
static inline int
__mutex_fastpath_lock_retval(atomic_t *count)
{if (unlikely(atomic_xchg(count, 0) != 1))if (likely(atomic_xchg(count, -1) != 1))return -1;return 0;
}static noinline int __sched __mutex_lock_interruptible_slowpath(struct mutex *lock)
{return __mutex_lock_common(lock, TASK_INTERRUPTIBLE, 0, NULL, _RET_IP_, NULL, 0);
}
mutex_lock_interruptible等待时设置任务进入TASK_INTERRUPTIBLE状态。
mutex_lock等待时设置任务进入TASK_UNINTERRUPTIBLE状态。
从mutex实现细节的分析可以知道,mutex比信号量的实现要高效很多。
- mutex最先实现自旋等待机制。
- mutex在说面之前尝试获取锁。
- mutex实现MCS锁来避免多个CPU争用锁而导致CPU高速缓存行颠簸现象。
在实际使用中,该如何选择spinlock,信号量和mutex呢?
在中断上下文中毫不犹疑地使用spinlock,如果临界区有睡眠,隐含睡眠的动作的内核API,应该避免选择spinlock。
在信号量和mutex中该如何选择呢?除非代码场景不符合上述mutex的约束,否则都优先使用mutex。