robust futex
简介
概述: 为了保证futex的robustness,添加了一种能够用于处理进程异常终止时锁状态的机制,就是当拥有锁的线程异常终止时,该线程能够将锁进行释放
基本原理: 每个线程都有一个私有的robust list,这个list在线程生命周期只能注册一次。在线程退出时,kernel会检查这个list,并进行相应的清理操作。
线程在执行do_exit()时常见的情况以及相应的处理方式:
- 常见情况:在大多数情况下,在线程执行do_exit函数时,kernel并没有注册任何list,因此对于robust list的处理成本仅仅是进行一个简单的比较,即检查当前线程的robust list是否为空
- 已注册list的情况:如果线程已经注册了list,通常这个list也大概率是空的,如果是正常退出的
- 线程异常退出的情况:如果注册的list不为空,则kernel会遍历这个list,并将所有由该线程拥有的锁标记为FUTEX_OWNER_DIED,然后唤醒一个waiter
潜在的竞争条件: 由于添加和删除list是在获取锁之后进行的,所以给线程留下了一个小窗口,在此期间可能导致线程异常退出,从而使锁被悬挂,为了防止这种可能性,还需要多维护一个list_op_pending字段,并在完成列表的添加或删除操作后将其清除【存在疑惑】
实现细节
新系统调用:
-
sys_set_robust_list:注册robust list,接收一个指向robust list head的指针和list head的长度作为参数
long get_robust_list(int pid, struct robust_list_head **head_ptr, size_t *len_ptr);
-
sys_get_robust_list:查询已注册的robust list,接收一个PID作为参数,并返回指向robust list head的指针和len指针。
-
如果传入的PID为0,则返回调用线程自身的robust list head指针
-
long set_robust_list(struct robust_list_head *head, size_t len);
-
-
当操作成功,返回0,否则返回错误码:
- set_robust_list():
- EINVAL:len和kernel预期的结构体robust_list_head的大小不匹配
- get_robust_list():
- ERERM:调用进程无权查看TID为PID的线程的robust list,并且不具有CAP_SYS_PTRACE功能(这是一个linux中的特权能力,用于允许进程进行调试和trace其他进程,具有该能力的成可以使用ptrace()系统调用来观察和控制其他进程的执行,包括读取和修改其内存、寄存器和指令流)
- ESRCH:找不到TID为PID的线程
- EFALUT:robust list head不能存储在location head中
- set_robust_list():
注册list: robust list指针简单存储在robust_list字段中,如果robus futex比较普通,可以通过扩展sys_clone来为新线程注册robust list head,而无需通过sys_set_robust_list注册
系统调用开销:
- 不使用robust futex的任务:几乎没有额外开销
- 使用robust futex的任务:每个线程的生命周期只会增加一个额外的系统调用,清理futex的操作时快速且简单的
Futex字段值
- 如果未获取锁,futex值的后面30位(即TID位)为0,所以只需要看后面30位是否有值就知道该futex是否被上锁
- 如果锁被获取,futex值的后面30位(即TID位)为拥有该锁的线程的TID
- 如果锁时拥有的,并且有线程争用锁,则应在futex中标识FUTEX_WAITERS位,即FUTEX_WAITERS | TID(FUTEX_WAITERS为0x8000_0000,也就是第一位)
- 如果线程退出时发现某个robust futex仍被持有,kernel会设置相应的futex字段的FUTEX_OWNER_DIED位(0x4000_0000,也就是第二位),然后唤醒下一个waiter(如果存在)。其余清理工作由用户空间完成
代码示例
使用示例程序
代码仓库: 1037827920/Futex-Example-Program
Rust示例程序:
use libc::{syscall, SYS_futex, SYS_get_robust_list, SYS_gettid, SYS_set_robust_list, FUTEX_WAIT,FUTEX_WAKE,
};
use std::{io, mem, panic, ptr,sync::{atomic::{AtomicU32, Ordering},Arc, Mutex,},thread,time::Duration,
};const FUTEX_INIT: u32 = 0x0000_0000;
const FUTEX_WAITERS: u32 = 0x8000_0000;
const FUTEX_TID_MASK: u32 = 0x3fff_ffff;#[repr(C)]
#[derive(Debug, Clone)]
struct RobustList {next: *mut RobustList,// futex: u32, (通过偏移量访问)
}#[repr(C)]
#[derive(Debug, Clone)]
struct RobustListHead {list: RobustList,/// 这个偏移量的作用是为了能够通过(list项的地址+偏移量)得到futex用户空间地址/// futex_offset的单位为字节,即1表示一个字节的偏移量/// 在用户空间添加这个字段还能进行灵活编码偏移量而不是在内核硬编码futex_offset: i64,list_op_pending: *mut RobustList,
}impl RobustListHead {fn new(futex_offset: i64) -> RobustListHead {RobustListHead {list: RobustList {next: ptr::null_mut(),},futex_offset,list_op_pending: ptr::null_mut(),}}fn push(&mut self, futex: AtomicU32) {let new_node = Box::into_raw(Box::new(RobustList {next: self.list.next,}));unsafe {((new_node as *mut u8).offset(self.futex_offset as isize) as *mut AtomicU32).write(futex);}self.list.next = new_node;}fn print_robust_list(&self) {let mut current = self.list.next;while !current.is_null() {unsafe {print!("{:?}(futex: {:?}, futex_val: {:?}) -> ",current,(current as *const u8).add(self.futex_offset as usize),((current as *const u8).add(self.futex_offset as usize) as *const AtomicU32).read(),);current = (*current).next;}}println!("NULL");}fn get_futex(&self, index: usize) -> *const AtomicU32 {let mut current = self.list.next;for _ in 0..index {if current.is_null() {return ptr::null_mut();}unsafe {current = (*current).next;}}if current.is_null() {return ptr::null_mut();}unsafe { (current as *const u8).add(self.futex_offset as usize) as *mut AtomicU32 }}
}unsafe impl Send for RobustListHead {}
unsafe impl Sync for RobustListHead {}fn main() {test_set_and_get_robust_list();test_robust_futex();
}fn futex_wait(futex: *const AtomicU32, thread: &str, tid: i64) {let futex_ref = unsafe { &*futex };loop {// 如果当前futex没有被其他线程持有if (futex_ref.load(Ordering::SeqCst) & FUTEX_TID_MASK) == 0 {futex_ref.swap(tid as u32, Ordering::SeqCst);// 加锁后直接返回,这样就不用执行系统调用,减少一定开销println!("线程{thread}上锁成功, futex值: {:#x}",(*futex_ref).load(Ordering::SeqCst));return;}// 标识futex的FUTEX_WAITERS位futex_ref.fetch_or(FUTEX_WAITERS, Ordering::SeqCst);println!("线程{thread}正在等待futex, futex值: {:#x}",(*futex_ref).load(Ordering::SeqCst));let ret = unsafe {syscall(SYS_futex,futex_ref as *const AtomicU32 as *mut u32,FUTEX_WAIT,futex_ref.load(Ordering::SeqCst),0,0,0,)};if ret == -1 {panic!("futex_wait系统调用执行失败");}}
}fn futex_wake(futex: *const AtomicU32, thread: &str) {let futex_ref = unsafe { &*futex };let ret = unsafe {syscall(SYS_futex,futex_ref as *const AtomicU32 as *mut u32,FUTEX_WAKE,1,0,0,0,)};if ret == -1 {panic!("futex_wake系统调用执行失败");}futex_ref.store(FUTEX_INIT, Ordering::SeqCst);println!("线程{thread}释放锁");
}/// 向kernel注册一个robust list
fn set_robust_list(robust_list_head_ptr: *const RobustListHead) {let ret = unsafe { syscall(SYS_set_robust_list, robust_list_head_ptr, mem::size_of::<RobustListHead>()) };if ret == -1 {panic!("set_robust_list系统调用执行失败, Err: {:?}",io::Error::last_os_error());}
}/// 获取kernel注册的robust list
/// 当pid=0时,表示获取当前进程的robust list
fn get_robust_list(pid: i32, robust_list_head_ptr: &mut *mut RobustListHead, len: &mut usize) {let ret = unsafe { syscall(SYS_get_robust_list, pid, robust_list_head_ptr, len) };if ret == -1 {panic!("get_robust_list系统调用执行失败, Err: {:?}",io::Error::last_os_error());}
}/// 测试set_robust_list系统调用和get_robust_list系统调用
fn test_set_and_get_robust_list() {// 初始化robust list head// 8字节为RobustList的大小let mut robust_list_head = RobustListHead::new(8);// 初始化robust listrobust_list_head.push(AtomicU32::new(FUTEX_INIT));robust_list_head.print_robust_list();// 设置robust listlet robust_list_head_ptr = &robust_list_head as *const RobustListHead;println!("robust_list_head_ptr: {:?}, len: {:?}",robust_list_head_ptr, mem::size_of::<RobustListHead>());set_robust_list(robust_list_head_ptr);// 获取robust listlet mut robust_list_head_ptr_geted: *mut RobustListHead = ptr::null_mut();let mut len_geted: usize = 0;get_robust_list(0, &mut robust_list_head_ptr_geted, &mut len_geted);println!("robust_list_head_ptr_geted: {:?}, robust_list_head: {:?} len_geted: {:?}",robust_list_head_ptr_geted,unsafe { &*robust_list_head_ptr_geted },len_geted);
}/// 测试一个线程异常退出时,robust list的表现
fn test_robust_futex() {// 创建robust list head// 8字节为RobustList的大小let robust_list_head = Arc::new(Mutex::new(RobustListHead::new(8)));// 初始化robust listlet mut robust_list_head_guard = robust_list_head.lock().unwrap();(*robust_list_head_guard).push(AtomicU32::new(FUTEX_INIT));drop(robust_list_head_guard);// 线程1let robust_list_head_clone1 = robust_list_head.clone();let thread1 = thread::spawn(move || {let tid = unsafe { syscall(SYS_gettid) };println!("线程1的线程号: {tid}");// 向kernel注册robust listlet robust_list_head_guard = robust_list_head_clone1.lock().unwrap();let robust_list_head_ptr = &(*robust_list_head_guard) as *const RobustListHead;set_robust_list(robust_list_head_ptr);// 尝试获取锁let futex = robust_list_head_guard.get_futex(0);futex_wait(futex, "1", tid);// 执行具体的业务逻辑thread::sleep(Duration::from_secs(5));// 模拟线程异常退出,是否会正常把未释放的锁释放掉println!("thread1异常退出");return;});thread::sleep(Duration::from_secs(3));// 线程2let robust_list_head_clone2 = robust_list_head.clone();let thread2 = thread::spawn(move || {let tid = unsafe { syscall(SYS_gettid) };println!("线程2的线程号: {tid}");// 向kernel注册robust listlet robust_list_head_guard = robust_list_head_clone2.lock().unwrap();let robust_list_head_ptr = &(*robust_list_head_guard) as *const RobustListHead;set_robust_list(robust_list_head_ptr);// 尝试获取锁let futex = robust_list_head_guard.get_futex(0);println!("线程2尝试获取锁, futex值: {:#x}",unsafe { &*futex }.load(Ordering::SeqCst));futex_wait(futex, "2", tid);// 执行具体的业务逻辑thread::sleep(Duration::from_secs(3));// 释放锁futex_wake(futex, "2");});thread1.join().unwrap();thread2.join().unwrap();
}
C示例程序:
#include <stdio.h>
#include <linux/futex.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <stdint.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <pthread.h>#define FUTEX_INIT 0x00000000
#define FUTEX_WAITERS 0x80000000
#define FUTEX_TID_MASK 0x3fffffff// 定义一个结构体来封装线程任务参数
typedef struct {struct robust_list_head* rlh;int thread;
} thread_task_args;// 向robust list插入新节点
void robust_list_push(struct robust_list_head* rlh, atomic_uint futex) {struct robust_list* new_robust_list = (struct robust_list*)malloc(sizeof(struct robust_list));if (new_robust_list == NULL) {perror("new robust list分配内存失败");}// 通过偏移量将futex写入新robust listatomic_uint* futex_ptr = (atomic_uint*)((uint8_t*)new_robust_list + rlh->futex_offset);*futex_ptr = futex;rlh->list.next = new_robust_list;
}// 获取索引为index的futex地址
atomic_uint* robust_list_get_futex(struct robust_list_head* rlh, size_t index) {struct robust_list* current = rlh->list.next;size_t i = 0;while (current != NULL) {if (i == index) {return (atomic_uint*)((uint8_t*)current + rlh->futex_offset);}current = current->next;i++;}return NULL;
}// 打印robust list
void robust_list_print(struct robust_list_head* rlh) {struct robust_list* current = rlh->list.next;while (current != NULL) {atomic_uint* futex_ptr = (atomic_uint*)((uint8_t*)current + rlh->futex_offset);atomic_int futex = *futex_ptr;printf("%p(futex: %p, futex_val: %u) -> ", (void*)current, (void*)futex_ptr, futex);current = current->next;}printf("NULL\n");
}// 向kernel注册robust list
void set_robust_list(struct robust_list_head* rlh) {long ret = syscall(SYS_set_robust_list, rlh, sizeof(struct robust_list_head));if (ret == -1) {perror("set_robust_list系统调用执行失败\n");}
}// 从kernel获取robust list
void get_robust_list(int pid, struct robust_list_head** rlh, size_t* len) {long ret = syscall(SYS_get_robust_list, pid, rlh, len);if (ret == -1) {perror("get_robust_list系统调用执行失败\n");}
}// futex_wait函数
void* futex_wait(atomic_uint* futex, int thread, long tid) {while (1) {// 如果当前futex没有其他线程持有if ((*futex & FUTEX_TID_MASK) == 0) {atomic_exchange(futex, (unsigned int)tid);// 加锁后直接返回printf("线程%d上锁成功. futex值: 0x%x\n", thread, *futex);return NULL;}// 线程进入等待状态atomic_fetch_or(futex, FUTEX_WAITERS);printf("线程%d正在等待futex, futex值: 0x%x\n", thread, *futex);long ret = syscall(SYS_futex, (unsigned*)futex, FUTEX_WAIT, *futex, 0, 0, 0);if (ret == -1) {perror("futex_wait系统调用执行失败\n");return NULL;}}
}// futex_wake函数
void* futex_wake(atomic_uint* futex, int thread) {long ret = syscall(SYS_futex, (unsigned*)futex, FUTEX_WAKE, 1, 0, 0, 0);if (ret == -1) {perror("futex_wake系统调用执行失败\n");return NULL;}atomic_store(futex, FUTEX_INIT);printf("线程%d释放锁\n", thread);return NULL;
}// 测试set_robust_list系统调用和get_robust_list系统调用
void test_set_and_get_robust_list() {// 创建一个robust list headstruct robust_list_head* rlh = (struct robust_list_head*)malloc(sizeof(struct robust_list_head));rlh->futex_offset = 8;// 初始化robust listrobust_list_push(rlh, 0x00000000);robust_list_print(rlh);// 设置robust listprintf("robust_list_head_ptr: %p, len: %ld\n", rlh, sizeof(struct robust_list_head));set_robust_list(rlh);// 获取robust liststruct robust_list_head* rlh_geted = (struct robust_list_head*)malloc(sizeof(struct robust_list_head));size_t len_geted;get_robust_list(0, &rlh_geted, &len_geted);printf("robust_list_head_geted_ptr: %p, len_geted: %ld\n", rlh_geted, len_geted);
}void* thread1_task(void* arg) {thread_task_args* args = (thread_task_args*)arg;// robust list head指针struct robust_list_head* rlh = args->rlh;// 线程号int thread = args->thread;// TIDlong tid = syscall(SYS_gettid);// 注册robust listset_robust_list(rlh);// 尝试获取锁atomic_uint* futex = robust_list_get_futex(rlh, 0);futex_wait(futex, thread, tid);// 执行具体的业务逻辑sleep(5);// 线程异常退出printf("线程%d异常退出\n", thread);pthread_exit(NULL);
}void* thread2_task(void* arg) {thread_task_args* args = (thread_task_args*)arg;// robust list head指针struct robust_list_head* rlh = args->rlh;// 线程号int thread = args->thread;// TIDlong tid = syscall(SYS_gettid);// 注册robust listset_robust_list(rlh);// 尝试获取锁atomic_uint* futex = robust_list_get_futex(rlh, 0);futex_wait(futex, thread, tid);// 执行具体的业务逻辑sleep(5);// 释放锁futex_wake(futex, thread);return NULL;
}// 测试一个线程异常退出时,robust list的表现
void test_robust_futex() {// 线程句柄pthread_t t1, t2;// 创建robust list headstruct robust_list_head* rlh = (struct robust_list_head*)malloc(sizeof(struct robust_list_head));// 初始化robust listrobust_list_push(rlh, 0x00000000);// 构建线程参数thread_task_args args1 = { rlh, 1 };thread_task_args args2 = { rlh, 2 };// 线程1pthread_create(&t1, NULL, thread1_task, (void*)&args1);// 等线程1先获取futex锁sleep(3);// 线程2pthread_create(&t2, NULL, thread2_task, (void*)&args2);// 等待线程结束pthread_join(t1, NULL);pthread_join(t2, NULL);
}int main() {test_set_and_get_robust_list();test_robust_futex();
}
运行结果:
实现示例程序
【待完成】
参考
- futex(2) - Linux 手册页 — futex(2) - Linux manual page