Horbynz hub

⌈xv6-fall2021⌋ lab 8:locks

Word count: 7.1kReading time: 29 min
2022/04/15

ABOUT

实验地址:Lab: locks


INTRO

这次 lab 难度非常大,主要是第二个 assignment,除去实验要求没有提到的一个踩坑点外,涉及 “大锁低效率,小锁易死锁” 的思想,除此之外还要考虑的是死锁问题,当然如果你坚持用大锁的话可能可以避免。至于死锁,我觉得,从代码的角度来调试真的很困难,我个人检查死锁的方法是设想一个死锁场景,然后跟着自己代码的逻辑看能不能走到。总之,这个 lab 难度确实不小,极具挑战性!


Memory allocator (MODERATE)

实验要求(译)(节选)

The program user/kalloctest stresses xv6’s memory allocator: three processes grow and shrink their address spaces, resulting in many calls to kalloc and kfree. kalloc and kfree obtain kmem.lock. kalloctest prints (as “#fetch-and-add”) the number of loop iterations in acquire due to attempts to acquire a lock that another core already holds, for the kmem lock and a few other locks. The number of loop iterations in acquire is a rough measure of lock contention.

“程序 user/kalloctest 加重了 xv6 内存分配器的压力:三个进程增加或减少它们的地址空间,这导致大量的 kalloc()kfree() 调用,而这两个函数的调用又涉及 kmem.lock 的获取。kalloctest 打印了试图获得其他进程拥有的锁而调用 acquire() 的次数(像 ‘#fetch-and-add’ 这样),包括 kmem 和一些其他锁”

acquire maintains, for each lock, the count of calls to acquire for that lock, and the number of times the loop in acquire tried but failed to set the lock. kalloctest calls a system call that causes the kernel to print those counts for the kmem and bcache locks (which are the focus of this lab) and for the 5 most contended locks. If there is lock contention the number of acquire loop iterations will be large. The system call returns the sum of the number of loop iterations for the kmem and bcache locks.

acquire() 为每个锁维持对该锁的调用次数,以及维持 acquire() 尝试立即获取锁但失败的次数。kalloctest 调用一个导致内核打印 kmembcache 以及其他 5 个被争用的锁数量的系统调用。如果有锁竞争,那么 acquire() 循环数量会非常大。这个系统调用返回 kmembcache 锁的调用总数”

The root cause of lock contention in kalloctest is that kalloc() has a single free list, protected by a single lock. To remove lock contention, you will have to redesign the memory allocator to avoid a single lock and list. The basic idea is to maintain a free list per CPU, each list with its own lock. Allocations and frees on different CPUs can run in parallel, because each CPU will operate on a different list. The main challenge will be to deal with the case in which one CPU’s free list is empty, but another CPU’s list has free memory; in that case, the one CPU must “steal” part of the other CPU’s free list. Stealing may introduce lock contention, but that will hopefully be infrequent.

kalloctest 的锁竞争的根本条件是 kalloc() 只有一个空闲链表,该链表由一把锁保护。为了消除锁竞争,你要重新涉及一个内存分配器用来避免单个锁、链表的竞争。基本思想是为每个 CPU 维持一个空闲链表,并且每个链表都有自己的锁。这样不同 CPU 上的分配和释放都可以并行运行,因为每个 CPU 可以在不同链表上操作。最大的问题是一个 CPU 的空闲链表为空时,此时另一个 CPU 的空闲链拥有空闲块,那么就可以 ‘偷’ 其他 CPU 链表的其中一部分。偷这个动作可能会导致锁竞争,但在这个实验里不频繁”

Your job is to implement per-CPU freelists, and stealing when a CPU’s free list is empty. You must give all of your locks names that start with “kmem”. That is, you should call initlock for each of your locks, and pass a name that starts with “kmem”. Run kalloctest to see if your implementation has reduced lock contention.

“你的任务是为每个 CPU 实现各自的空闲链表,并且当链表空时需要去其他 CPU 的链表偷空闲块。你必须以 ‘kmem’ 打头命名你的全部锁,即,为每个锁调用 initlock() 并传入以 kmem 打头的形参。运行 kalloctest 检查你是否减少了锁竞争”


实验思路

总结下来这个 assignment 就两件事:

  • 为每个 CPU 维持一个空闲链表,并且每个链表一把锁
  • 当链表空,要 “偷” 其他 CPU 的链表

Solution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// path: kernel/kalloc.c
struct freelist {
struct spinlock lock;
struct run *freelist;
};
struct freelist kmem[NCPU];

void
kinit() {
// 重命名所有锁,并初始化
for (int i = 0; i < NCPU; ++i) {
char lockname[8];
snprintf(lockname, 8, "kmem", i);
initlock(&kmem[i].lock, lockname);
}
freerange(end, (void*)PHYSTOP);
}

void
kfree(void *pa) {
...
r = (struct run*)pa;

push_off();
int id = cpuid();
acquire(&kmem[id].lock);
r->next = kmem[id].freelist;
kmem[id].freelist = r;
release(&kmem[id].lock);
pop_off();
}

void *
kalloc(void) {
struct run *r;

push_off();
int id = cpuid();
acquire(&kmem[id].lock);
r = kmem[id].freelist;
if(r)
kmem[id].freelist = r->next;
else {
// 偷其他 CPU 的链表结点
for (int i = 0; i < NCPU; ++i) {
if (kmem[i].freelist) {
acquire(&kmem[i].lock);
r = kmem[i].freelist;
kmem[i].freelist = kmem[i].freelist->next;
release(&kmem[i].lock);
break;// 偷到一个就可以退出了,不然偷多了会内存泄漏
}
}
}
release(&kmem[id].lock);
pop_off();

if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}

Buffer cache (HARD)

实验要求(译)(节选)

If multiple processes use the file system intensively, they will likely contend for bcache.lock, which protects the disk block cache in kernel/bio.c. bcachetest creates several processes that repeatedly read different files in order to generate contention on bcache.lock

“如果多个进程密集地使用文件系统,它们很可能会竞争 bcache.lock,这是 kernel/bio.c 定义得,用来保护磁盘上磁盘块的锁。bcachetest 创建了多个进程,重复读不同文件只为测试 bcache.lock 的竞争”

Modify the block cache so that the number of acquire loop iterations for all locks in the bcache is close to zero when running bcachetest. Ideally the sum of the counts for all locks involved in the block cache should be zero, but it’s OK if the sum is less than 500. Modify bget and brelse so that concurrent lookups and releases for different blocks that are in the bcache are unlikely to conflict on locks (e.g., don’t all have to wait for bcache.lock). You must maintain the invariant that at most one copy of each block is cached.

“修改 block cache 以便在 bcache 里的所有锁的 acquire() 次数在运行 bcachetest 时接近零。最理想的在 bcache 调用所有锁的总数应该是零,但其实小于 500 也没问题。修改 bget()brelse() 以便并发的寻找和释放 bcache 里不同的块不会导致锁竞争(如不用全部都等 bcache.lock)。你必须维持每个已缓存块的不变量最多只有一个副本”

Reducing contention in the block cache is more tricky than for kalloc, because bcache buffers are truly shared among processes (and thus CPUs). For kalloc, one could eliminate most contention by giving each CPU its own allocator; that won’t work for the block cache. We suggest you look up block numbers in the cache with a hash table that has a lock per hash bucket.

“在 block cache 里减少竞争比 kalloc() 里的情况更难,因为 bcache 里的缓冲区是所有进程(以及所有 CPU)都共享的。对于 kalloc(),你可以通过给每个 CPU 它自己的分配器而消除大多竞争,但对于 block cache 却行不通。我们建议用哈希表来寻找 block cache 里的块号,这个哈希表设置为每个哈希 bucket 一把锁

There are some circumstances in which it’s OK if your solution has lock conflicts:

  • When two processes concurrently use the same block number. bcachetest test0 doesn’t ever do this.
  • When two processes concurrently miss in the cache, and need to find an unused block to replace. bcachetest test0 doesn’t ever do this.
  • When two processes concurrently use blocks that conflict in whatever scheme you use to partition the blocks and locks; for example, if two processes use blocks whose block numbers hash to the same slot in a hash table. bcachetest test0 might do this, depending on your design, but you should try to adjust your scheme’s details to avoid conflicts (e.g., change the size of your hash table).

bcachetest’s test1 uses more distinct blocks than there are buffers, and exercises lots of file system code paths.

"以下几种情况锁冲突的情况是没问题的:

  • 当两个并发进程使用同一个块号。bcachetest test0 不检查
  • 当两个并发进程 cache 不命中,而需要找一个未用过的块来使用。bcachetest test0 也不检查这个问题
  • 当两个并发进程,在无论你如何细分锁的粒度、如何调度磁盘块的这种场景下,使用的块都会产生竞争。例如,如果两个进程使用的块都映射到同一个地方。bcachetest test0 会检查这种情况。但主要看你怎么实现,或者你需要关注细节的问题来规避这个问题(如改变哈希表的大小)

bcachetest test1 使用比缓冲区大得多的不同块,并且涉及大量的文件系统调用"


实验提示(译)(节选)

Searching in the hash table for a buffer and allocating an entry for that buffer when the buffer is not found must be atomic.

“当 cache 不命中时,在整个哈希表中寻找一个 buffer,并且为这个 buffer 填充磁盘块的属性。以上几个步骤必须是原子操作”

Remove the list of all buffers (bcache.head etc.) and instead time-stamp buffers using the time of their last use (i.e., using ticks in kernel/trap.c). With this change brelse doesn’t need to acquire the bcache lock, and bget can select the least-recently used block based on the time-stamps.

“删除所有 buffer 中的链表(bcache.head 等),取而代之的是进程最后一次使用的时间点(即使用 kernel/trap.c 里的 ticks 变量)。经过这个更改后 brelse() 不需要获取 bcache 的锁了,并且 bget() 的 LRU 算法变成基于时间戳的 LRU 算法”

It is OK to serialize eviction in bget (i.e., the part of bget that selects a buffer to re-use when a lookup misses in the cache).

“可以在 bget() 里连续地进行 eviction (eviction 即 bget() 在 cache 不命中时选择并重复使用一个 buffer 这部分代码)”(译者注,连续地 eviction 即连续地遍历链表,从而寻找可用的空闲块

When replacing a block, you might move a struct buf from one bucket to another bucket, because the new block hashes to a different bucket. You might have a tricky case: the new block might hash to the same bucket as the old block. Make sure you avoid deadlock in that case.

“当取代一个块时,你可以需要从一个块移动 struct buf 到另一个块,因为新块是映射到另一个不同的块上的。你可能会遇到新块映射到旧块上的困难,此时你需要确保不发生死锁”(译者注,事实上改变哈希映射时不一定要移动块。比如你使用链表,链表元素是块地址,那么改变哈希就只需改变链表地址元素的指向就够了


实验思路

首先还是要弄懂这个 assignment 到底要我们做一个什么样的事情:

如图所示,原来整个 block cache 只由一把锁保护,那么所有并发进程都要争用这把锁,这势必要等待,就降低了效率。因此,实验希望把整个大锁拆分掉,分成许多把小锁。这样并发进程请求资源就可以尽可能地分散开来,避免了争用同一把锁

然后就要想怎么细分这把大锁,其实上面实验要求已经提示了,“We suggest you…” 这一句,简单来说就是在大锁之下、每个 cached buffer 元素之上,再抽象一层,在这一层引入哈希表。具体来说就是下图的逻辑:

原来逻辑为什么是双链表,详见 kernel/bio.c/binit(),现在只需根据实验要求引入哈希表,这里键值对应该可以想到 <blockno#, cached buffer 数组索引> 的形式,即给出一个块号,返回 cached buffer 数组的一个索引

现在有个问题可以想想,实验提示说到 “With this change brelse doesn’t need to acquire the bcache lock”,为什么引入哈希表以后,释放块就不需要持有 bcache.lock 了?

按原来链表的逻辑,假设现在 PA、PB 并发调用 brelse(),且恰好各自形参的引用都是 1

接下来 PA、PB 都访问 block cache 链表,且都是在链头插入各自的块,那么后来那个进程可能覆盖前面的数据、又或者两个块的写入交替进行,这便形成了 race condition,所以原来逻辑需要一把锁确保每次只能有一个进程访问

那么现在,引入哈希表之后,你释放的块只能是你自己哈希 bucket 上的链表的,难道可以释放别人链表的块?因此,既然是自己的链表,只需要自己的锁就行,就不需要大锁 bcache.lock

还有一个小问题。就是这个 LRU 算法该怎么理解?到底是使用时长最短?还是使用顺序的先后?

这就要看 LRU 算法的含义了————最近最少使用,即最近使用过的不会淘汰,而不是使用时间最短的淘汰,这两者概念不同

再看 kernel/trap.c/ticks 这个变量,它的含义就是随着开机时间延迟而不断自增

结合起来,最近使用过的 ticks 数值上必然最大,反而很久以前就开始使用的进程,它的 ticks 就很小。所以 “最近最少使用” 说的就是最小的 ticks

引入 ticks 这个逻辑后,原来 LRU 链表连来连去那部分逻辑就可以删掉了

现在需要考虑一个最关键的问题,就是 “大锁”、“小锁” 到底是什么东西?到底怎么理解?

如果你看过 xv6 handout,第六章,你应该了解一个矛盾的思想————“大锁低效率,小锁易死锁”,这又怎么理解?

结合这个 lab 的需求来看,大锁就是 bcache.lock,负责保护全部的公共数据,这个保护具体来说就是每次只允许一个进程访问。那么,当你并发假设 10 个进程,虽然这 10 个进程是同时访问资源的,但大锁保证了一种顺序形式的资源访问次序,结果就是这 10 个进程逻辑上等同于 1 个进程的顺序执行————这就是 “大锁低效率” 的由来,但是另一方面,大锁保证了正确性,因为资源的访问井然有序,不可能出现某种资源的争用

这也就是这个 assignment 的目的,表面上是减少大锁的使用,但目的是提高并发的效率。试想下 10 个进程同时运行快?还是 10 个进程一个一个地运行快?

现在再考虑下,怎么做才能减少大锁的使用?一个极端的例子是不用大锁,只用小锁(在这个 lab 里,小锁就是每个哈希 bucket 的锁)。那只用小锁会有问题吗?答案是往往很容易导致死锁————这就是 “小锁易死锁” 的由来

原来只有一把大锁的时候,并发进程运行虽然慢点,但好歹能往下运行。但现在使用了小锁,却很可能陷入死局而致进程无法往下推进————P2 持有自己的锁,请求 P5 的锁;同时,P5 持有自己的锁,请求 P2 的锁————只要 P2 没完成,P2 不可能释放自己的锁,P5 也一样,大家都在等待对方先完成,这就大家都完成不了

所以正如 xv6-handout 说的那样,这两个粒度的锁其实带来了很矛盾的影响,来看看题目最开始这个输出就可以理解了

test0 很明显就是测试大锁访问量的————源代码里只使用大锁,所以并发量一高,大锁访问量也越高,自然不能通过测试;test1 结合后面 “bcachetest’s test1 uses more distinct blocks than there are buffers, and exercises lots of file system code paths.” 这句话来看,我猜应该是测高并发下程序执行的正确性的————源代码由于只用大锁,所以必然能确保正确性

所以就带来了矛盾,一方面我们想高效运行,一方面又希望程序运行的正确,可以想象这中间肯定有个度需要把握好。但是,这个 lab 的需求就是减少大锁的使用,因此,势必会出现以下两种设计方案

设计方案一:引入小锁同时,保留大锁的部分使用

设计思想如下图所示:

初始化的时候每个哈希 bucket 链表均为空;所有 buffer 都由 block cache(放到代码里即 struct bcache 结构体)掌控。随着程序运行趋于稳定,即开始了一些分配与释放之后,像下半部分这个图这样,开始出现 “散装” 的数据区域:

  • blockno#13 代表的是 0 号 bucket 的哈希链表,它被分配了两块 buffer,它用自己的链表结构表示着,这两块 buffer 受到 0 号 bucket 这把锁保护
  • blockno#41 也是同理,只不过它被分配了一块,它的锁 #2 只能保护这一块
  • 与此同时,大锁保护的数据区仍然一直存在。通过 block cache 我们仍然能找到所有 buffer,虽然目前 #0、#2、#3 被其他 bucket 占有

这种设计方案有什么好处呢?最大的好处就是避免竞争,每次我自己的 bucket 没有可用的块时,我就到公共数据区找,只要我能先抢到大锁,那么接下来我就独占整个公共数据区。这个时间段内其他人根本不可能进入 block cache,那就自然不可能发生我和你大家抢同一块 buffer 的离奇现象了,这就消除了 race condition

但可惜的是,我在遵循这种思路设计的代码仍然无法将大锁访问量降至 500 以下,我不清楚是我代码设计的问题还是说,像 6w 次并发保持在 500 以下的大锁访问量压根就不可能,所以也有可能的是这个实验根本就不想我们使用大锁(毕竟实验要求有提到 “最理想的情况下大锁访问量为 0”)

那么,这就引出第二种设计方案了

设计方案二:只使用小锁

设计思想如下

初始化的时候由其中一个哈希 bucket 持有所有的 buffer,随着程序运行趋于稳定,这个 bucket 链表的内存肯定会被分配出去,其他 bucket 都有机会拥有 buffer。最终就是所有 bucket 链表都大致上能均分全部的 buffer

可能你会觉得这种策略一开始由 bucket#0 拥有所有 buffer,这不是和设计一的思路一样了吗?其他 bucket 没有可用块的时候,设计一是往公共的 block cache 寻找,那么现在往 bucket#0 寻找,不也是往一个固定的地方找吗?

对于运行伊始,确实是这样。一开始所有 bucket# 都跑到 bucket#0 这里取 buffer 块。但是当出现一些分配和释放之后,一旦其他 bucket 链表 “抢” 到了块,那么以后涉及的分配就不只是 bucket#0 的事————比如,假设现在所有 bucket 都拥有至少一个块,那么我 blockno#41 代表的 bucket#2 链表如果没有可用块,它会先去 bucket#0 寻找可用块,如果找不到,再去 bucket#1,再找不到,就去 bucket#2…依此类推

到这个时候,设计二的高效率才体现出来。之前没可用块时,大家都挤到同一个地方找资源,现在大家都分散开来,到多个不同的地方找资源

但是,设计二是有非常大的问题的,这就是前面提过的 “小锁易死锁” 的死锁

那我怎么知道我自己设计的代码是否有死锁发生呢?以我的理解,死锁根本没办法用代码调试出来,毕竟进程的推进速度是随机的,你现在这个环境 PA 和 PB 发生死锁,下次可能就是 PC 和 PD 死锁了。那应该怎么做?我这里的方法是先画一个死锁的逻辑,然后跟着自己代码的逻辑一直走,看能不能走到画好的逻辑上。举个例子:

这是一个很简单的两个进程的死锁模型,就是彼此持有彼此请求的资源————P1 持有 #1 锁,请求 #2 锁;而 P2 持有 #2 锁,请求 #1 锁

此时要做的事情就是检查自己代码的逻辑:从 binit()bget() 再到 brelse(),看看能不能走到死锁

万一真有死锁的风险,应该怎么处理?这就涉及操作系统死锁八股文的知识了,你可以让它产生死锁,然后实现一个资源图简化的逻辑(这可以剥夺死锁进程的资源);也可以采取一些其他手段避免死锁,当然银行家算法放在这里那可太复杂了。在这个 lab 里我也思考过死锁的问题,推荐可以参考以下的资料:

  • 死锁避免算法, can_lock():这位老哥提供了一个分配锁的策略,每个 bucket 链表只能申请一半范围内的锁。由于锁的申请范围缩小了一半,请求锁时发生冲突的可能也减少了。不过,我觉得虽然看起来确实可以减少请求锁时的冲突,但冲突机率再小也有冲突的可能,比如一半范围内的两把锁死锁也不是没可能。当然如果想验证这个算法正不正确,跟着它程序逻辑走一遍可能是个好方法,但毕竟要花一点点时间,所以只能说提供一种避免死锁的思路
  • 公共区不加锁,发生竞争时再回退至公共区:这种策略我觉得比较可取。思想就是当前 bucket 链没有可用块而需要到公共区 block cache 寻找时,由于没有上锁,势必会出现两个 bucket 链表同时寻找到同一个 buffer 的情况。但是因为 buffer 属于 struct buf 结构体,这个结构体本身带锁,这把锁保证了每次只有一个进程能进入 buffer。所以,虽然大家都找到同一个 buffer,但每次只允许一个进程使用 buffer,那么这个进程使用完后,buf->refcnt 字段是会被设为 1 的,通过这个字段就可以告诉下一个进程,这个 buffer 已经被使用过了,你赶紧回到上一步重新找另外的块————这就是 “冲突后回退” 的思想

但是呢,我在真正实现的时候没有使用任何的死锁预防手段,因为我发现这个 lab 有些窍门可以避免死锁————先遍历自己的链表有没有空闲块,再遍历其他人的链表,这就可以做到避免死锁。我们来设想一个实际的场景:

这还是上面那个最简单的两两死锁模型,现在思考先遍历自己的链表再遍历其他人的链表是否会发生死锁?

如图所示,当前并发两个进程 P1 和 P2,P1 先遍历自己的链表,发现了可用的块 buffer#13。那此时 P2 能发现 P1 链表里这个块吗,答案是根本不可能,因为 P2 也会先找自己链表(这里会先找到 buffer#1)。那么换一个别的进程比如 P3,此时发现了 P1 可用的 buffer#13 呢?这确实可能发生,取决于具体的实现(如果你先请求其他 bucket 的锁,再去它的链表找可用块,那就不可能发生这种情况了),但也仅此而已。因为你别忘了访问 buffer#13 要先请求 bucket#1 这把锁,而这把锁由 P1 持有,那么也是 P1 先使用这个 buffer#13。总之,利用这个窍门可以避免死锁

那么现在大锁小锁的设计思想都分享完了,接下来是一个细节的地方,那就是 bpin()bunpin() 里面的大锁 bcache.lock 要怎么处理?

如果不处理,那么调用这两个函数的时候肯定要请求大锁,这仍会使大锁访问量居高不下,所以也要取代这个大锁。那么用哈希 bucket 的小锁可以吗?还是要定义新的其他锁?

答案是直接用哈希 bucket 小锁就够了,因为 bpin()bunpin() 其实就是将块上的计数增加或减少,而每个 cached buffer 都有自己的块号,通过这个块号可以计算出哈希 bucket,这就找到对应的哈希链了,然后申请小锁就可以


Solution

真正实现之前,有个非常非常踩坑的地方要说明的就是,一定要修改 kernel/param.h 里的宏 FSSIZE,改大一点,比如从 1000 改成 10000

由于实验要求没有指出这一点,导致我在 usertestswritebig 测试点卡在 PANIC: balloc: out of blocks 两天多,这里可真的太坑了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// path: kernel/param.h
#define FSSIZE 10000 // size of file system in blocks

// path: kernel/buf.h
struct buf {
// lab lock
uint tickstamp;

int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
struct buf *prev; // LRU cache list
struct buf *next;
uchar data[BSIZE];
};

// path: kernel/bio.c
#define PRIME 13

struct hash_t {
struct spinlock lock;
struct buf list;
};
struct hash_t hash[PRIME];

struct {
struct buf buf[NBUF];
} bcache;

void
binit(void)
{
struct buf *b;

// lab lock
// 初始化哈希表的🔒
for (int i = 0; i < PRIME; ++i) {
initlock(&hash[i].lock, "bcache");
hash[i].list.next = 0;
}

for(b = bcache.buf; b < bcache.buf+NBUF; b++){
// 初始化每个 cached buffer 的🔒
initsleeplock(&b->lock, "buffer");

// 标识这个 buffer 属于哪个哈希 bucket
// bget() eviction 阶段,遍历全部 cached buffer 时
// 不使用大锁取而代之的是细粒度的锁
b->blockno = 0;

// 将全部 RENBUF 个的 cached buffer 数组都给第一个哈希 bucket
b->next = hash[0].list.next;
hash[0].list.next = b;
}
}

// bget() 会调用的一个函数封装
// 淘汰某个块,这个块由最小的 ticks 指出,传入形参 rob,表示需要抢走
static struct buf *
subbget(struct buf *rob, uint dev, uint blockno) {
int bucket = blockno % PRIME;
// victim 是待抢走的块 rob 所在的哈希链
int victim = rob->blockno % PRIME;

// 抢完 buffer 的时间点这个 buffer 从现在开始投入使用
rob->tickstamp = ticks;
rob->dev = dev;
rob->blockno = blockno;
rob->valid = 0;
rob->refcnt = 1;

if (victim != bucket) {
// 被抢的哈希链,断链
struct buf *pre = &hash[victim].list;
for (; pre->next != rob; pre = pre->next);
pre->next = rob->next;
release(&hash[victim].lock);
// 当前这个 bucket 抢完很开心,加了一块 buffer 到自己链头
rob->next = hash[bucket].list.next;
hash[bucket].list.next = rob;
}

release(&hash[bucket].lock);
acquiresleep(&rob->lock);
return rob;
}

static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;

// Is the block already cached?
int bucket = blockno % PRIME;
acquire(&hash[bucket].lock);
for(b = hash[bucket].list.next; b != 0; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&hash[bucket].lock);
acquiresleep(&b->lock);
return b;
}
}

// Not cached.
// Recycle the least recently used (LRU) unused buffer.
// 缓冲不命中时,需要找最久以前(即最小)的 ticks
// 遍历所有的哈希链表,找出 ticks 最小那块
// do-while() 可以先检查自己的哈希链表
int i = bucket;
do {
// 如果当前遍历的哈希链是 bucket,那就不要再上自己的锁
if (i != bucket) acquire(&hash[i].lock);

// 遍历当前 bucket 所在的链表,找出 ticks 最小
struct buf *minibuf = 0;
uint mini = 0xffffffff;
for (b = hash[i].list.next; b != 0; b = b->next) {
if (b->refcnt == 0 && mini > b->tickstamp) {
mini = b->tickstamp;
minibuf = b;
}
}

// 在当前哈希链找到,那就直接 "抢" 走,然后就可以返回了
if (minibuf != 0)
return subbget(minibuf, dev, blockno);

if (i != bucket) release(&hash[i].lock);
i = (i + 1) % PRIME;
} while (i != bucket);

panic("bget: no buffers");
}

void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");

releasesleep(&b->lock);

// lab lock
int bucket = b->blockno % PRIME;
acquire(&hash[bucket].lock);

b->refcnt--;
// 只需将 ticks 清零,0 必是最快被 LRU 淘汰的
if (b->refcnt == 0) b->tickstamp = ticks;

release(&hash[bucket].lock);
}

void
bpin(struct buf *b) {
int bucket = b->blockno % PRIME;
acquire(&hash[bucket].lock);
b->refcnt++;
release(&hash[bucket].lock);
}

void
bunpin(struct buf *b) {
int bucket = b->blockno % PRIME;
acquire(&hash[bucket].lock);
b->refcnt--;
release(&hash[bucket].lock);
}

写于最后

这可是我最花时间的 lab 了,共耗时 32h27m

CATALOG
  1. 1. ABOUT
  2. 2. INTRO
  3. 3. Memory allocator (MODERATE)
    1. 3.1. 实验要求(译)(节选)
    2. 3.2. 实验思路
    3. 3.3. Solution
  4. 4. Buffer cache (HARD)
    1. 4.1. 实验要求(译)(节选)
    2. 4.2. 实验提示(译)(节选)
    3. 4.3. 实验思路
    4. 4.4. 设计方案一:引入小锁同时,保留大锁的部分使用
    5. 4.5. 设计方案二:只使用小锁
    6. 4.6. Solution
  5. 5. 写于最后