MIT 6.S081 2020 Lab8 locks 个人全流程
零、写在前面
在本实验中,你将学习如何重新设计代码以提高并行性。在多核计算机上,并行性差的一个常见表现是高锁竞争。为了减少竞争、提升并行性,通常需要同时修改数据结构和加锁策略。你将在本实验中对 xv6 的内存分配器和块缓存进行相关改进。
记得切换分支到 lock
一、Memory allocator
1.1 说明
程序 user/kalloctest
用于对 xv6 的内存分配器进行压力测试:**三个进程会不断增长和缩小它们的地址空间,从而频繁调用 kalloc
和 kfree
。**这两个函数都会获取 kmem.lock
。kalloctest
会输出(以 #fetch-and-add
形式)在 acquire
函数中为了获取已被其他核心占用的锁而进行的循环次数,该数据适用于 kmem
锁以及其他一些锁。acquire
中的循环次数可以粗略地反映锁竞争的程度。
acquire
会记录每个锁被调用的次数(即 acquire()
)以及尝试获取锁失败而进入自旋的次数(即 #fetch-and-add
)。kalloctest
会调用一个系统调用,让内核打印 kmem
和 bcache
两个锁(本实验重点关注的对象)以及锁竞争最严重的前五个锁的这些统计信息。如果存在锁竞争,那么自旋次数会很大。该系统调用最终返回 kmem
和 bcache
锁的自旋总次数。
kalloctest
中锁竞争的根本原因在于 kalloc()
仅使用一个空闲链表,并由一个锁保护。为消除锁竞争,你需要重新设计内存分配器,避免单一锁和链表。基本思路是为每个 CPU 维护一个空闲链表,每个链表有自己的锁。这样,不同 CPU 上的分配和释放操作可以并行执行,因为它们各自操作独立的链表。
主要的挑战在于:当某个 CPU 的空闲链表为空,而另一个 CPU 的链表还有可用内存时,前者必须从后者“偷取”部分内存块。虽然偷取可能会导致锁竞争,但希望这种情况是少数。
你的任务是实现 每 CPU 一个空闲链表,并在链表为空时执行“偷取”逻辑。你必须为所有的锁命名以 "kmem"
开头。也就是说,在 initlock
中初始化每个锁时,传入的名字必须以 "kmem"
开头。
一些提示:
- NCPU 宏定义为 cpu 数目
- cpuid 函数用于获取当前cpu 的id
1.2 实现
官网的说明已经很直白了,为了尽可能避免锁竞争,我们为每一个CPU 维护一个空闲链表。
- 先把 原来的kmem 改为 kmem 数组
- 然后根据编译器的报错,把每个位置改成对对应cpuid 的 cpu 进行操作即可
- 值得注意的是,根据官网的要求,我们kalloc 如果对应cpu 不够的话,我们应该向其他 CPU “偷取” 部分内存块
struct {struct spinlock lock;struct run *freelist;
} kmem[NPROC];void kinit()
{for (int i = 0; i < NPROC; ++ i) {initlock(&kmem[i].lock, "kmem");}freerange(end, (void*)PHYSTOP);
}void
kfree(void *pa)
{struct run *r;int id = cpuid();if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)panic("kfree");// Fill with junk to catch dangling refs.memset(pa, 1, PGSIZE);r = (struct run*)pa;acquire(&kmem[id].lock);r->next = kmem[id].freelist;kmem[id].freelist = r;release(&kmem[id].lock);
}void *kalloc(void)
{struct run *r = 0;for (int i = 0, id = cpuid(); i < NPROC && !r; i++, ++ id) {if (id == NPROC) {id = 0;}acquire(&kmem[id].lock);r = kmem[id].freelist;if (r) {kmem[id].freelist = r->next;}release(&kmem[id].lock);}if (r) {memset((char *) r, 5, PGSIZE); // fill with junk}return (void *) r;
}
来测试一下:
二、Buffer cache
2.1 说明
这个作业的后半部分与前半部分是相互独立的;无论你是否完成了前半部分,你都可以进行这一部分的工作,并通过相应的测试。
如果多个进程密集地使用文件系统,它们可能会争用 bcache.lock
,这个锁用于保护 kernel/bio.c
中的磁盘块缓存。bcachetest
会创建多个进程反复读取不同的文件,以制造对 bcache.lock
的竞争。
你的任务是修改块缓存的实现,使得在运行 bcachetest
时,所有与块缓存相关的锁的 acquire
循环次数总和接近 0。理想情况下这个总和为 0,但只要总和小于 500 就可以接受。
你需要修改 bget
和 brelse
,使得对 不同块的并发查找和释放 不太可能在锁上产生冲突(比如:不必都等待 bcache.lock
)。但你必须保证每个块最多只缓存一份。
你必须为所有与块缓存相关的锁命名以 "bcache"
开头。也就是说,你在调用 initlock
初始化锁时,传入的锁名要以 "bcache"
开头。
与 kalloc
不同,减少块缓存的锁竞争更具挑战性,因为块缓存是多个进程(从而是多个 CPU)真正共享的。而 kalloc
可以通过为每个 CPU 提供独立的分配器来消除大部分竞争,但这种方法对块缓存并不适用。
我们建议你用哈希表来存放缓存块,用每个桶一个锁的方式来查找块号。
以下几种情况出现锁冲突是可以接受的:
- 两个进程同时访问相同的块号。
bcachetest
的 test0 不会发生这种情况。 - 两个进程在缓存中都未命中,同时需要找到一个未被使用的块进行替换。
bcachetest
的 test0 也不会发生这种情况。 - 两个进程访问的块恰好落在相同的哈希桶中(例如,它们的块号哈希到了同一个槽)。
bcachetest
的 test0 可能会发生这种情况,取决于你的设计,但你应尽量调整方案细节来减少这类冲突(比如改变哈希表大小)。
bcachetest
的 test1 使用的块比缓存中的块数量多,会触发文件系统中更多的代码路径。
一些提示
- 可以使用固定数量的桶,不必动态扩容哈希表。建议使用质数个桶(如 13)以减少哈希冲突;
- 在哈希表中查找缓存块,以及在缓存中未命中时分配新的缓存块,这两个步骤必须是原子的;
- 删除维护所有块的全局链表(如
bcache.head
等),而是使用最近一次使用的时间戳(可用kernel/trap.c
中的ticks
)标记每个块的使用时间。这样可以让brelse
不再需要获取bcache
锁,而bget
可以根据时间戳选择最近最少使用的块; - 可以在
bget
中串行化替换逻辑(即查找缓存失败时,选择一个块重新使用的部分); - 某些情况下你可能需要同时持有两个锁;例如,在替换缓存块时可能需要同时持有
bcache.lock
和某个桶的锁。一定要避免死锁; - 替换块时,可能需要将
struct buf
从一个桶移动到另一个桶中(因为新块号哈希到了不同的桶)。特别注意一种情况:新旧块可能哈希到了同一个桶 —— 此时一定要注意避免死锁; - 调试建议:可以在实现桶锁时暂时保留 bcache 全局锁的 acquire/release 操作(比如放在
bget
的开头和结尾),以串行化整个逻辑;确保逻辑无竞态后,再移除全局锁并处理并发问题。
2.2 实现
-
我们需要改进块缓存的实现,官网的hint 提示我们利用哈希 以及 LRU。
-
为了降低锁竞争,我们需要用多个锁来替代原来的单个锁,官网也提示我们为每一个哈希桶开一个锁
-
为了实现 LRU,官网提示我们使用 时间戳,那么我们替换的时候替换时间戳最小(最久未使用)的那个
-
先修改下原来的 struct buf
-
因为不再基于双向链表头删头插实现LRU,所以删去原来的链表设计
-
增加时间戳字段 timestamp
struct buf {int valid; // has data been read from disk?int disk; // does disk "own" buf?uint dev;uint blockno;struct sleeplock lock;uint refcnt;uchar data[BSIZE];uint timestamp;
};
- 相应的,修改下 bcache 结构体
- 长度 为 质数13
- 原来结构体里面buf 长度为 30,现在我们有 13个 bcache,所以长度改为 ceil(30 / 13) = 3
#define NBUCKET 13
#define NBUFFER 3struct {struct spinlock lock;struct buf buf[NBUFFER];
} bcache[NBUCKET];uint GLOBAL_TIME_STAMP = 0; // global timestamp
- 初始化逻辑
void
binit(void)
{for (int i = 0; i < NBUCKET; ++i) {initlock(&bcache[i].lock, "bcache");for (int j = 0; j < NBUFFER; ++j) {bcache[i].buf[j].timestamp = 0; initsleeplock(&bcache[i].buf[j].lock, "buffer");}}
}
为了方便后续操作,我们添加俩辅助函数:
- hash:获取哈希表下标
- update_timestamp:更新时间戳
// hash x to bucket id
uint hash(int x) {return x % NBUCKET;
}
// inc timestamp
void update_timestamp(struct buf *b) {b->timestamp = __sync_fetch_and_add(&GLOBAL_TIME_STAMP, 1); //atomic
}
根据编译器的定位,我们需要修改 bget()、
brelse()、
bpin()、
bunpin() 四处 使用了 bcache.lock 的地方
bget()
- 第一遍查找对应桶里面能不能找到
- 第二遍找空闲 buf
- 找到,就分配
- 否则,lru替换策略
- 分配记得清空初始化
// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{struct buf *b;uint bucket_id = hash(blockno);acquire(&bcache[bucket_id].lock);// Is the block already cached?for(int i = 0; i < NBUFFER; ++ i) {b = &bcache[bucket_id].buf[i];if(b->dev == dev && b->blockno == blockno) {++ b->refcnt;update_timestamp(b);release(&bcache[bucket_id].lock);acquiresleep(&b->lock);return b;}}release(&bcache[bucket_id].lock);// -1 in uint is uint_maxuint minTim = -1;struct buf *lru = 0; // least used buf// Not cached.// Recycle the least recently used (LRU) unused buffer.for(int i = 0, cur_bucket = bucket_id; i < NBUCKET; ++i, ++cur_bucket) {if(cur_bucket == NBUCKET){cur_bucket = 0; }acquire(&bcache[cur_bucket].lock);for(int j = 0; j < NBUFFER; ++j) {b = &bcache[cur_bucket].buf[j];// not usedif(b->refcnt == 0) { b->dev = dev;b->blockno = blockno;b->valid = 0;b->refcnt = 1;update_timestamp(b);release(&bcache[cur_bucket].lock);acquiresleep(&b->lock);return b;}if(b->timestamp < minTim) {minTim = b->timestamp;lru = b;}}release(&bcache[cur_bucket].lock);}if (lru) {acquire(&bcache[bucket_id].lock);lru->dev = dev;lru->blockno = blockno;lru->valid = 0;lru->refcnt = 1;update_timestamp(lru);release(&bcache[bucket_id].lock);acquiresleep(&lru->lock);return lru;}panic("bget: no buffers");
}
brelse()
- 只需要改成根据桶id 获取锁就行
// Release a locked buffer.
// Move to the head of the most-recently-used list.
void
brelse(struct buf *b)
{if(!holdingsleep(&b->lock))panic("brelse");releasesleep(&b->lock);int bucket_id = hash(b->blockno);acquire(&bcache[bucket_id].lock);--b->refcnt;release(&bcache[bucket_id].lock);releasesleep(&b->lock);
}
bpin()
- 同样改一下获取锁的方式
void
bpin(struct buf *b) {int bucket_id = hash(b->blockno);acquire(&bcache[bucket_id].lock);++b->refcnt;release(&bcache[bucket_id].lock);
}
bpin()
- 同样改一下获取锁的方式
void
bunpin(struct buf *b) {int bucket_id = hash(b->blockno);acquire(&bcache[bucket_id].lock);--b->refcnt;release(&bcache[bucket_id].lock);
}
测试下: