跳转至

实验五:fork 的实现、并发与锁机制

在执行每一条命令前,请你对将要进行的操作进行思考

为了你的数据安全和不必要的麻烦,请谨慎使用 sudo,并确保你了解每一条指令的含义。

1. 实验文档给出的命令不需要全部执行

2. 不是所有的命令都可以无条件执行

3. 不要直接复制粘贴命令执行

fork 的实现

在操作系统设计中,进程的控制除了创建、终止等基本操作之外,还包括了进程的复制

这种复制的操作可以用于创建子进程,被称为 fork,它可以使得用户进程具有控制多个进程的能力,从而实现并发执行。

YSOS 的 fork 系统调用设计如下描述:

出于实验设计考量:
本实现与 Linux 或 POSIX 中所定义的 fork 有所不同,也结合了 Linux 中 vfork 的行为。

  • fork 会创建一个新的进程,新进程称为子进程,原进程称为父进程。
  • 子进程在系统调用后将得到 0 的返回值,而父进程将得到子进程的 PID。 如果创建失败,父进程将得到 -1 的返回值。
  • fork 不复制父进程的内存空间,不实现 Cow (Copy on Write) 机制,即父子进程将持有一定的共享内存:代码段、数据段、堆、bss 段等。
  • fork 子进程与父进程共享内存空间(页表),但子进程拥有自己独立的寄存器和栈空间,即在一个不同的栈的地址继承原来的数据。
  • 由于上述内存分配机制的限制,fork 系统调用必须在任何 Rust 内存分配(堆内存分配)之前进行。

为了实现父子进程的资源共享,在先前的实验中,已经做了一些准备工作:

比如 pkg/kernel/src/proc/paging.rs 中,PageTableContext 中的 Cr3RegValueArc 保护了起来;在 pkg/kernel/src/proc/data.rs 中,也存在 Arc 包装的共享数据的内容。

忘了 Arc 是什么?

Arcalloc::sync 中的一个原子引用计数智能指针,它允许多个线程同时拥有对同一数据的所有权,且不会造成数据竞争。

Arcclone() 方法会增加引用计数,drop() 方法会减少引用计数,当引用计数为 0 时,数据会被释放。Arc 本身是不可变的,但可以通过 RwLock 获取内部可变性,进而安全的修改一个被多个线程所持有的数据。

对于 Windows 等将进程抽象为资源容器的操作系统,这些需要共享的资源也就会被抽象为进程对象。在这种情况下,实验所设计的行为又更类似于 “新建一个执行线程” 的操作。

系统调用

有了上一次实验的经验,系统调用的新增、处理均已经有了一定的经验,此处不过多赘述。对 fork 系统调用有如下约定,别忘了在 syscall_def 中定义你的系统调用号:

1
2
// None -> pid: u16 or 0 or -1
Syscall::Fork => { /* ... */},

如果你和笔者一样有强迫症,Linux 相关功能的系统调用号是 58

进程管理

关于具体的实现

实验至此,你也应当积累了一些自己的项目管理经验,对于上述的 FIXME,你应当有一些自己的想法,用合适的方式进行实现。

后续的完善将会给出一些提示、建议和注意事项,相关代码结构并不需要完全按照文档进行

请注意:每个 FIXME 并不代表此功能必须在对应的位置实现,你也应当自由管理相关函数的返回值、参数等。

在处理好用户态库和系统调用的对接后,参考如下代码,完善你的 fork:

往下翻翻,说明更多哦(为什么总有人不看完文档就开始写代码!)

1
2
3
4
5
6
7
8
9
pub fn fork(context: &mut ProcessContext) {
    x86_64::instructions::interrupts::without_interrupts(|| {
        let manager = get_process_manager();
        // FIXME: save_current as parent
        // FIXME: fork to get child
        // FIXME: push to child & parent to ready queue
        // FIXME: switch to next process
    })
}
1
2
3
4
5
6
7
8
9
impl ProcessManager {
    pub fn fork(&self) {
        // FIXME: get current process
        // FIXME: fork to get child
        // FIXME: add child to process list

        // FOR DBG: maybe print the process ready queue?
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
impl Process {
    pub fn fork(self: &Arc<Self>) -> Arc<Self> {
        // FIXME: lock inner as write
        // FIXME: inner fork with parent weak ref

        // FOR DBG: maybe print the child process info
        //          e.g. parent, name, pid, etc.

        // FIXME: make the arc of child
        // FIXME: add child to current process's children list
        // FIXME: set fork ret value for parent with `context.set_rax`
        // FIXME: mark the child as ready & return it
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
pub struct ProcessInner {
    // ...
    parent: Option<Weak<Process>>,
    children: Vec<Arc<Process>>,
    // ...
}

impl ProcessInner {
    pub fn fork(&mut self, parent: Weak<Process>) -> ProcessInner {
        // FIXME: fork the process virtual memory struct
        // FIXME: calculate the real stack offset
        // FIXME: update `rsp` in interrupt stack frame
        // FIXME: set the return value 0 for child with `context.set_rax`

        // FIXME: clone the process data struct

        // FIXME: construct the child process inner

        // NOTE: return inner because there's no pid record in inner
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
impl ProcessVm {
    pub fn fork(&self, stack_offset_count: u64) -> Self {
        // clone the page table context (see instructions)
        let owned_page_table = self.page_table.fork();

        let mapper = &mut owned_page_table.mapper();
        let alloc = &mut *get_frame_alloc_for_sure();

        Self {
            page_table: owned_page_table,
            stack: self.stack.fork(mapper, alloc, stack_offset_count),
        }
    }
}

impl Stack {
    pub fn fork(
        &self,
        mapper: MapperRef,
        alloc: FrameAllocatorRef,
        stack_offset_count: u64,
    ) -> Self {
        // FIXME: alloc & map new stack for child (see instructions)

        // FIXME: copy the *entire stack* from parent to child

        // FIXME: return the new stack
        Self {
            range: /* new stack range */,
            usage: /* new stack usage */
        }
    }
}

关于具体的代码实现,参考如下的提示和说明:

  1. 将功能的具体实现委托至下一级进行,保持代码语义的简洁。

    • 系统调用静态函数,并将其委托给 ProcessManager::fork
    • ProcessManager::fork 将具体实现委托给当前进程的 Process::fork
    • Process::fork 将具体实现委托给 ProcessInner::fork

    每一层代码只关心自己层级的逻辑和数据,不关心持有自身的锁或其他外部数据的状态,进而提高代码可维护性。

  2. 使用先前实现的 save_currentswitch_next 等函数,提高代码复用性。

    如果使用时遇到了问题,很可能是你的代码过于相互耦合,尝试将逻辑进行分离,保证函数功能的单一性。

  3. 分别为父子进程设置返回值。

    进程调用系统调用后,会根据恢复的寄存器的值获取系统调用的返回值。

    对于 fork 系统调用,需要为父进程和子进程设置不同的返回值,这意味着为他们不同的 context 设置 rax 寄存器。

    设置子进程的 context 时,先根据父进程进行复制,并在复制后修改 rax 为 0。

  4. 使用 Arc::downgrade 获取 Weak 引用,从而避免循环引用。

    父进程持有子进程的强引用,子进程持有父进程的弱引用,这样可以避免循环引用导致的内存泄漏。

  5. 为了复制栈空间,你可以使用 core::intrinsics::copy_nonoverlapping 函数。

    这个函数会使用底层 LLVM 所提供的内存复制相关指令,具有较高的性能。需要调用侧保证源和目标的内存空间不会重叠。可以封装为如下函数进行使用:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    /// Clone a range of memory
    ///
    /// - `src_addr`: the address of the source memory
    /// - `dest_addr`: the address of the target memory
    /// - `size`: the count of pages to be cloned
    fn clone_range(&self, cur_addr: u64, dest_addr: u64, size: u64) {
        trace!("Clone range: {:#x} -> {:#x}", cur_addr, dest_addr);
        unsafe {
            copy_nonoverlapping::<u64>(
                cur_addr as *mut u64,
                dest_addr as *mut u64,
                (size * Size4KiB::SIZE / 8) as usize,
            );
        }
    }
    
  6. 记录父子进程共用的页表。

    可以使用 Arc 来提供引用计数,来确保进程逐个退出时,只有最后一个退出的进程会进行页表内容的释放。为此,你需要补充一些相关的函数调用:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    impl PageTableContext {
        // ...
        pub fn using_count(&self) -> usize {
            Arc::strong_count(&self.reg)
        }
    
        pub fn fork(&self) -> Self {
            // forked process shares the page table
            Self {
                reg: self.reg.clone(),
            }
        }
        // ...
    }
    

    也可以补充一些相关的调试信息:

    1
    2
    3
    4
    5
    6
    7
    impl Debug for PageTableContext {
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
            // ...
                .field("refs", &self.using_count())
            // ...
        }
    }
    
  7. 为子进程分配合适的栈空间。

    通过子进程数量、页表引用计数、当前父进程的栈等信息,为子进程分配合适的栈空间。

    下面是一个比较常规的期望的栈空间分配结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    +---------------------+ <- 0x400000000000
    |    Parent Stack     |
    +---------------------+ <- 0x3FFF00000000
    |    Child 1 Stack    |
    +---------------------+ <- 0x3FFE00000000
    |    Child 2 Stack    |
    +---------------------+ <- 0x3FFD00000000
    |         ...         |
    +---------------------+
    

    这样的栈布局在复杂情况下可能会造成栈复用,在这种情况下进行 map_range 会失败,从而你可以继续寻找合适的偏移:

    1
    2
    3
    4
    5
    while elf::map_range(/* page range to be mapped */).is_err()
    {
        trace!("Map thread stack to {:#x} failed.", new_stack_base);
        new_stack_base -= STACK_MAX_SIZE; // stack grow down
    }
    

    你也可以用 bitmap 等结构体记录栈的释放,或使用其他方式进行合理的分配。

    此处的实现很灵活,也无需完全按照上述栈规划进行,你可以自行对算法或分布进行设计。

    更通用的实现

    在更好的实现中,fork 并不复制整个栈,操作系统会启用 fork 后全部页面的写保护。

    在任意进程尝试写入时,再对整个页面进行复制。这种策略被称为写时复制(Copy on Write,COW),它可以大大减少内存的使用和开销,提高性能。

功能测试

在完成了 fork 的实现后,你需要通过如下功能测试来验证你的实现是否正确:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#![no_std]
#![no_main]

extern crate alloc;
extern crate lib;

use lib::*;

static mut M: u64 = 0xdeadbeef;

fn main() -> isize {
    let mut c = 32;

    let pid = sys_fork();

    if pid == 0 {
        println!("I am the child process");

        assert_eq!(c, 32);

        unsafe {
            println!("child read value of M: {:#x}", M);
            M = 0x2333;
            println!("child changed the value of M: {:#x}", M);
        }

        c += 32;
    } else {
        println!("I am the parent process");

        sys_stat();

        assert_eq!(c, 32);

        println!("Waiting for child to exit...");

        let ret = sys_wait_pid(pid);

        println!("Child exited with status {}", ret);

        assert_eq!(ret, 64);

        unsafe {
            println!("parent read value of M: {:#x}", M);
            assert_eq!(M, 0x2333);
        }

        c += 1024;

        assert_eq!(c, 1056);
    }

    c
}

entry!(main);

进程的阻塞与唤醒

进程的阻塞与唤醒是非常重要的功能,它可以用于控制进程的执行顺序、资源的分配、进程的同步等。在先前的实现中,已经实现了 wait_pid 系统调用,它通过轮询的方式来等待一个进程的退出,并返回其退出状态。

相对而言,轮询会消耗大量的 CPU 时间,因此需要一种更为高效的方式来进行进程的阻塞与唤醒:能否让进程在等待某个事件发生时进入阻塞状态,而在事件发生时唤醒进程呢?

在本实验的设计中,对于 wait_pid 系统调用,进程在发出调用后,会进入一个等待队列,当被等待的进程退出时,会唤醒等待队列中的进程。

等待队列

pkg/kernel/src/proc/manager.rs 中,修改 ProcessManager 并添加等待队列:

1
2
3
4
pub struct ProcessManager {
    // ...
    wait_queue: Mutex<BTreeMap<ProcessId, BTreeSet<ProcessId>>>,
}

其中,BTreeMap 的键值对应于被等待的进程 ID,BTreeSet 用于存储等待的进程 ID 的集合。

阻塞进程

ProcessManager 添加 block 函数,用于将进程设置为阻塞状态:

1
2
3
4
5
6
/// Block the process with the given pid
pub fn block(&self, pid: ProcessId) {
    if let Some(proc) = self.get_proc(&pid) {
        // FIXME: set the process as blocked
    }
}

设置进程的私有字段

ProcessInner 中的 status 字段是私有的,对于一系列的状态变更,你可以添加相应的函数来进行设置。

例如,你可以添加 block 函数,来设置进程的状态:

1
2
3
pub fn block(&mut self) {
    self.status = ProcessStatus::Blocked;
}

pkg/kernel/src/proc/mod.rs 中,修改 wait_pid 系统调用的实现,添加 ProcessContext 参数来确保可以进行可能的切换上下文操作(意味着当前进程被阻塞,需要切换到下一个进程):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
pub fn wait_pid(pid: ProcessId, context: &mut ProcessContext) {
    x86_64::instructions::interrupts::without_interrupts(|| {
        let manager = get_process_manager();
        if let Some(ret) = manager.get_exit_code(pid) {
            context.set_rax(ret as usize);
        } else {
            manager.wait_pid(pid);
            manager.save_current(context);
            manager.current().write().block();
            manager.switch_next(context);
        }
    })
}

同时为 ProcessManager 添加 wait_pid 函数:

1
2
3
4
5
pub fn wait_pid(&self, pid: ProcessId) {
    let mut wait_queue = self.wait_queue.lock();
    // FIXME: push the current process to the wait queue
    //        `processor::current_pid()` is waiting for `pid`
}

使用 BTreeMapentry 方法

BTreeMapentry 方法可以用于获取一个键对应的值的可变引用,并可以通过 or_default 或者 or_insert 来插入一个默认值。

1
2
3
let mut map: BTreeMap<u32, BTreeSet<u32>> = BTreeMap::new();
let entry = map.entry(42).or_default();
entry.insert(2333);

唤醒进程

在阻塞进程后,还需要对进程进行唤醒。对于本处的 wait_pid 系统调用,当被等待的进程退出时,需要唤醒等待队列中的进程。

首先,为 ProcessManager 添加 wake_up 函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
/// Wake up the process with the given pid
///
/// If `ret` is `Some`, set the return value of the process
pub fn wake_up(&self, pid: ProcessId, ret: Option<isize>) {
    if let Some(proc) = self.get_proc(&pid) {
        let mut inner = proc.write();
        if let Some(ret) = ret {
            // FIXME: set the return value of the process
            //        like `context.set_rax(ret as usize)`
        }
        // FIXME: set the process as ready
        // FIXME: push to ready queue
    }
}

在进程退出时,也即 kill 系统调用中,需要唤醒等待队列中的进程。修改 ProcessManager 中的 kill 函数:

1
2
3
4
5
6
7
8
9
pub fn kill(pid: ProcessId, ret: isize) {
    // ...

    if let Some(pids) = self.wait_queue.lock().remove(&pid) {
        for pid in pids {
            self.wake_up(pid, Some(ret));
        }
    }
}

这样,就实现了一个无需轮询的进程阻塞与唤醒机制。

阶段性成果

尝试在你的 Shell 中启动另一个 Shell,然后在其中利用 ps 打印进程信息:

前一个 Shell 应当被阻塞 (Blocked),直到后一个 Shell 退出。

并发与锁机制

由于并发执行时,线程的调度顺序无法预知,进而造成的执行顺序不确定,持有共享资源的进程之间的并发执行可能会导致数据的不一致,最终导致相同的程序产生一系列不同的结果,这样的情况被称之为竞态条件(race condition)

条件竞争……?

恶意程序利用类似的原理,通过不断地尝试,最终绕过检查,获得了一些不应该被访问的资源,这种对系统的攻击行为也被称为条件竞争。

它们的英文翻译都是 Race Condition,但在不同的领域内常用不同的翻译。

一个著名的例子是 Linux 内核权限提升漏洞 Dirty COW (CVE-2016-5195),通过条件竞争使得普通用户可以写入原本只读的内存区域,从而提升权限。

考虑如下的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
static mut COUNTER: usize = 0;

fn main() {
    let mut handles = vec![];

    for _ in 0..10 {
        handles.push(std::thread::spawn(|| {
            for _ in 0..1000 {
                unsafe {
                    COUNTER += 1;
                }
            }
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", unsafe { COUNTER });
}

可以直接使用 rustc main.rs 进行编译

得到的结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
$ for ((i = 0; i < 16; i++)); do ./main; done
Result: 9595
Result: 8838
Result: 8315
Result: 7602
Result: 9120
Result: 8485
Result: 8831
Result: 8717
Result: 8812
Result: 8955
Result: 9266
Result: 8168
Result: 9159
Result: 10000
Result: 9664
Result: 10000

可以看到,每次运行的结果都可能不一样,这是因为 COUNTER += 1 操作并不是原子的,它包含了读取、修改和写入三个步骤,而在多线程环境下,这三个步骤之间可能会被其他线程(通过操作系统的时钟中断或其他方式)打断,反汇编上述代码,可以看到 COUNTER += 1 的实际操作:

1
2
3
4
mov rax, qword [obj.main::COUNTER::hfb966cd5c23908b7] # read COUNTER to rax
add rax, 1                                            # rax += 1
# ... overflow check by rustc ...
mov qword [obj.main::COUNTER::hfb966cd5c23908b7], rax # write rax to COUNTER

考虑如下的执行顺序(实际执行的时钟中断会慢得多,所以上述代码使用循环来凸显这一问题):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Thread 1
mov rax, qword [obj.main::COUNTER::hfb966cd5c23908b7]
add rax, 1

# !!! Context Switch !!!

# Thread 2
mov rax, qword [obj.main::COUNTER::hfb966cd5c23908b7]

# !!! Context Switch !!!

# Thread 1
mov qword [obj.main::COUNTER::hfb966cd5c23908b7], rax

# !!! Context Switch !!!

# Thread 2
add rax, 1
mov qword [obj.main::COUNTER::hfb966cd5c23908b7], rax

在这样的执行顺序下,COUNTER 的值会比预期少,几个线程可能会同时读取到相同的值,然后同时写入相同的值,这样的行为就会导致 += 的语意被破坏。

上面这种访问共享资源的代码片段被称为临界区,为了保证临界区的正确性,需要确保每次只有一个线程可以进入临界区,也即保证这部分指令序列是互斥的。

原子指令

一般而言,为了解决并发任务带来的问题,需要通过指令集中的原子操作来保证数据的一致性。在 Rust 中,这类原子指令被封装在 core::sync::atomic 模块中,作为架构无关的原子操作来提供并发安全性。

AtomicUsize 为例,它提供了一系列的原子操作,如 fetch_addfetch_updatecompare_exchange 等,这些操作都是原子的,不会被其他线程打断,对于之前的例子:

1
2
static COUNTER: AtomicUsize = AtomicUsize::new(0);
COUNTER.fetch_add(1, Ordering::SeqCst);

其中 Ordering 用户控制内存顺序,在单核情况下,Ordering 的选择并不会影响程序的行为,可以简单了解,并尝试回答思考题 4 的内容。

在编译器优化后将会被编译为:

1
lock inc qword [obj.main::COUNTER::h2889e4585a2a2d30]

这就是一句原子的 inc 指令,中断或任务切换都不会打断这个指令的执行,从而保证了 COUNTER 的一致性。

在了解了原子指令的基本概念后,可以利用它来为用户态程序提供两种简单的同步操作:自旋锁 SpinLock 和信号量 Semaphore。其中自旋锁的实现并不需要内核态的支持,而信号量则会涉及到进程调度等操作,需要内核态的支持。

正因如此,在进行内核编写的过程中遇到的 MutexRwLock 等用于保障内核态数据一致性的锁机制均是基于自旋锁实现的你可能在之前的实验中遇到过系统因为自旋忙等待导致的异常情况

自旋锁

自旋锁 SpinLock 是一种简单的锁机制,它通过不断地检查锁的状态来实现线程的阻塞,直到获取到锁为止。

pkg/lib/src/sync.rs 中,关注 SpinLock 的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
pub struct SpinLock {
    bolt: AtomicBool,
}

impl SpinLock {
    pub const fn new() -> Self {
        Self {
            bolt: AtomicBool::new(false),
        }
    }

    pub fn acquire(&mut self) {
        // FIXME: acquire the lock, spin if the lock is not available
    }

    pub fn release(&mut self) {
        // FIXME: release the lock
    }
}

// Why? Check reflection question 5
unsafe impl Sync for SpinLock {}

在实现 acquirerelease 时,你需要使用 AtomicBool 的原子操作来保证锁的正确性:

  • load 函数用于读取当前值。
  • store 函数用于设置新值。
  • compare_exchange 函数用于原子地进行比较-交换,也即比较当前值是否为目标值,如果是则将其设置为新值,否则返回当前值。

在进行循环等待时,可以使用 core::hint::spin_loop 提高性能,在 x86_64 架构中,它实际上会编译为 pause 指令。

信号量

得利于 Rust 良好的底层封装,自旋锁的实现非常简单。但是也存在一定的问题:

  • 忙等待:自旋锁会一直占用 CPU 时间,直到获取到锁为止,这会导致 CPU 利用率的下降。
  • 饥饿:如果一个线程一直占用锁,其他线程可能会一直无法获取到锁。
  • 死锁:如果两个线程互相等待对方占有的锁,就会导致死锁。

信号量 Semaphore 是一种更为复杂的同步机制,它可以用于控制对共享资源的访问,也可以用于控制对临界区的访问。通过与进程调度相关的操作,信号量还可以用于控制进程的执行顺序、提高 CPU 利用率等。

信号量需要实现四种操作:

  • new:根据所给出的 key 创建一个新的信号量。
  • remove:根据所给出的 key 删除一个已经存在的信号量。
  • signal:也叫做 V 操作,也可以被 release/up/verhogen 表示,它用于释放一个资源,使得等待的进程可以继续执行。
  • wait:也叫做 P 操作,也可以被 acquire/down/proberen 表示,它用于获取一个资源,如果资源不可用,则进程将会被阻塞。

为了实现与内核的交互,信号量的操作将被实现为一个系统调用,它将使用到三个系统调用参数:

1
2
// op: u8, key: u32, val: usize -> ret: any
Syscall::Sem => sys_sem(&args, context),

其中 op 为操作码,key 为信号量的键值,val 为信号量的值,ret 为返回值。根据先前的约定,op 被放置在 rdi 寄存器中,keyval 分别被放置在 rsirdx 寄存器中,可以通过 args.arg0args.arg1args.arg2 来进行访问。

信号量相关内容在 pkg/kernel/src/proc/sync.rs 中进行实现:

“资源” 被抽象为一个 usize 整数,它并不需要使用 AtomicUsize,为了存储等待的进程,需要在此整数外额外使用一个 Vec 来存储等待的进程。它们二者将会被一个自旋锁实现的互斥锁(在内核中直接使用 spin::Mutex)保护。

1
2
3
4
pub struct Semaphore {
    count: usize,
    wait_queue: VecDeque<ProcessId>,
}

信号量操作的结果使用 SemaphoreResult 表示:

1
2
3
4
5
6
pub enum SemaphoreResult {
    Ok,
    NotExist,
    Block(ProcessId),
    WakeUp(ProcessId),
}
  • Ok:表示操作成功,且无需进行阻塞或唤醒。
  • NotExist:表示信号量不存在。
  • Block(ProcessId):表示操作需要阻塞线程,一般是当前进程。
  • WakeUp(ProcessId):表示操作需要唤醒线程。

为了实现信号量的 KV 存储,使用 SemaphoreSet 定义信号量集合的操作:

1
2
3
pub struct SemaphoreSet {
    sems: BTreeMap<SemaphoreId, Mutex<Semaphore>>,
}

并在 ProcessData 中添加为线程共享资源:

1
2
3
4
5
pub struct ProcessData {
    // ...
    pub(super) semaphores: Arc<RwLock<SemaphoreSet>>,
    // ...
}

关于这里的一堆锁……

在本实验实现的单核处理器下,Semaphore 的实现似乎并不需要内部的 Mutex 进行保护,只需要外部的 RwLock 进行保护即可。

但在多核处理器下,Semaphore 的实现可能会涉及到多个核心的并发访问,因此需要使用 Mutex 来提供更细粒度的锁保护。在进行添加、删除操作时,对 RwLock 使用 write 获取写锁,而在进行 signalwait 操作时,对 RwLock 使用 read 来获取更好的性能和控制。

综上考量,这里就保留了 Mutex 的使用。

由于信号量会阻塞进程,所以需要在系统调用的处理中按照信号量的返回值进行相关进程操作,一个代码示例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
pub fn sem_wait(key: u32, context: &mut ProcessContext) {
    x86_64::instructions::interrupts::without_interrupts(|| {
        let manager = get_process_manager();
        let pid = processor::current_pid();
        let ret = manager.current().write().sem_wait(key, pid);
        match ret {
            SemaphoreResult::Ok => context.set_rax(0),
            SemaphoreResult::NotExist => context.set_rax(1),
            SemaphoreResult::Block(pid) => {
                // FIXME: save, block it, then switch to next
                //        use `save_current` and `switch_next`
            }
            _ => unreachable!(),
        }
    })
}

请参考实验代码给出的相关注释内容,完成信号量的实现、不同操作的系统调用服务实现,最后完善作为系统调用的 sys_sem

1
2
3
4
5
6
7
8
9
pub fn sys_sem(args: &SyscallArgs, context: &mut ProcessContext) {
    match args.arg0 {
        0 => context.set_rax(new_sem(args.arg1 as u32, args.arg2)),
        1 => context.set_rax(remove_sem(args.arg1 as u32)),
        2 => sem_signal(args.arg1 as u32, context),
        3 => sem_wait(args.arg1 as u32, context),
        _ => context.set_rax(usize::MAX),
    }
}

进程的唤醒

wait_pid 系统调用类似,你需要在 sem_signal 中对进程进行唤醒。

但是此处无需为进程设置返回值,因此在调用 wake_up 时,传入 None 即可。

完善用户库

完善 pkg/lib/src/sync.rs 中有关信号量的操作,使用不同的 op 参数来进行信号量的用户态函数的分配,系统调用宏需要将参数转换为 usize 类型,可以参考如下声明:

1
2
3
4
#[inline(always)]
pub fn sys_new_sem(key: u32, value: usize) -> bool {
    syscall!(Syscall::Sem, 0, key as usize, value) == 0
}

测试任务

在实现了 SpinLockSemaphore 的基础上,你需要完成如下的用户程序任务来测试你的实现:

多线程计数器

在所给代码的 pkg/app/counter 中实现了一个多线程计数器,多个线程对一个共享的计数器进行累加操作,最终输出计数器的值。

为了提供足够大的可能性来触发竞态条件,该程序使用了一些手段来刻意构造一个临界区,这部分代码不应被修改。

你需要通过上述两种方式,分别保护该临界区,使得计数器的值最终为 800

如何声明锁变量才能让它在进程间共享?根据上文的实现和你的理解,让它们正确发挥作用。

尝试修改代码,使用两组线程分别测试 SpinLockSemaphore

一个参考代码行为如下,你可以参考这种形式修改和编写测试程序,而无需分成两个用户程序:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
fn main() -> isize {
    let pid = sys_fork();

    if pid == 0 {
        test_semaphore();
    } else {
        test_spin();
        sys_wait_pid(pid);
    }

    0
}

你可以在 test_spintest_semaphore 中分别继续 fork 更多的进程用来实际测试。

消息队列

创建一个用户程序 pkg/app/mq,结合使用信号量,实现一个消息队列:

  • 父进程使用 fork 创建额外的 16 个进程,其中一半为生产者,一半为消费者。

  • 生产者不断地向消息队列中写入消息,消费者不断地从消息队列中读取消息。

  • 每个线程处理的消息总量共 10 条。

    即生产者会产生 10 个消息,每个消费者只消费 10 个消息。

  • 在每个线程生产或消费的时候,输出相关的信息。

    你可能需要使用信号量或旋锁来实现一个互斥锁,保证操作和信息输出之间不会被打断。

  • 在生产者和消费者完成上述操作后,使用 sys_exit(0) 直接退出。

  • 最终使用父进程等待全部的子进程退出后,输出消息队列的消息数量。

  • 在父进程创建完成 16 个进程后,使用 sys_stat 输出当前的全部进程的信息。

你需要保证最终消息队列中的消息数量为 0,你可以开启内核更加详细的日志,并使用输出的相关信息尝试证明队列的正常工作:

  • 在从队列取出消息时,消息为空吗?
  • 在向队列写入消息时,队列是否满了?
  • 在队列为空时,消费者是否被阻塞?
  • 在队列满时,生产者是否被阻塞?

分别设置队列容量为 1, 4, 8, 16,记录观察生产者和消费者的行为:

  1. 队列的元素计数是否没有超过容量?
  2. 生产者和消费者是否能够交替工作?
  3. 尝试观察到多个生产者或消费者同时进行同种操作的情况。

哲学家的晚饭

假设有 5 个哲学家,他们的生活只是思考和吃饭。这些哲学家共用一个圆桌,每位都有一把椅子。在桌子中央有一碗米饭,在桌子上放着 5 根筷子。

当一位哲学家思考时,他与其他同事不交流。时而,他会感到饥饿,并试图拿起与他相近的两根筷子(筷子在他和他的左或右邻居之间)。

一个哲学家一次只能拿起一根筷子。显然,他不能从其他哲学家手里拿走筷子。当一个饥饿的哲学家同时拥有两根筷子时,他就能吃。在吃完后,他会放下两根筷子,并开始思考。

创建一个用户程序 pkg/app/dinner,使用课上学到的知识,实现并解决哲学家就餐问题:

  • 创建一个程序,模拟五个哲学家的行为。

  • 每个哲学家都是一个独立的线程,可以同时进行思考和就餐。

  • 使用互斥锁来保护每个筷子,确保同一时间只有一个哲学家可以拿起一根筷子。

  • 使用等待操作调整哲学家的思考和就餐时间,以增加并发性和实际性。

    • 如果你实现了 sys_time 系统调用(Lab 4),可以使用它来构造 sleep 操作。
    • 如果你并没有实现它,可以参考多线程计数器中的 delay 函数进行实现。
  • 当哲学家成功就餐时,输出相关信息,如哲学家编号、就餐时间等。

  • 向程序中引入一些随机性,例如在尝试拿筷子时引入一定的延迟,模拟竞争条件和资源争用。

  • 可以设置等待时间或循环次数,以确保程序能够运行足够长的时间,并尝试观察到不同的情况,如死锁和饥饿。

在用户态中引入伪随机数

Rust 提供了一些伪随机数生成器,你可以使用 rand 库来引入一些随机性,以模拟不同的情况。

1
2
3
[dependencies]
rand = { version = "0.8", default-features = false }
rand_chacha = { version = "0.3", default-features = false }

在无标准库的环境下,你需要为伪随机数生成器提供种子。

如果你实现了 sys_time 系统调用,这会是一个很方便的种子。

如果你没有实现,不妨试试使用 sys_getpid 或者 fork 顺序等数据作为种子来生成随机数。

你可以将上述功能封装到用户库中,以便在用户程序中使用。

ChaCha20Rng 伪随机数生成器为例,使用相关方法获取随机数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
use rand::prelude::*;
use rand_chacha::ChaCha20Rng;

fn main() {
    // ...
    let time = lib::sys_time();
    let mut rng = ChaCha20Rng::seed_from_u64(time.timestamp() as u64);
    println!("Random number: {}", rng.gen::<u64>());
    // ...
}

相关文档请查阅:The Rust Rand Book

通过观察程序的输出和行为,请尝试构造并截图记录以下现象:

  • 某些哲学家能够成功就餐,即同时拿到左右两侧的筷子。
  • 尝试构造死锁情况,即所有哲学家都无法同时拿到他们需要的筷子。
  • 尝试构造饥饿情况,即某些哲学家无法获得足够的机会就餐。

尝试解决上述可能存在的问题,并介绍你的解决思路。

声明一系列的信号量

pkg/lib/src/sync.rs 中,提供了 semaphore_array 宏,可以用于快速声明一系列信号量:

1
static CHOPSTICK: [Semaphore; 5] = semaphore_array![0, 1, 2, 3, 4];
可能的解决思路……

分布式系统中,常见的解决思路是引入一个“服务生”来协调哲学家的就餐。

这个服务生会控制筷子的分配,从而避免死锁和饥饿的情况

思考题

  1. 在 Lab 2 中设计输入缓冲区时,如果不使用无锁队列实现,而选择使用 Mutex 对一个同步队列进行保护,在编写相关函数时需要注意什么问题?考虑在进行 pop 操作过程中遇到串口输入中断的情形,尝试描述遇到问题的场景,并提出解决方案。

  2. 在进行 fork 的复制内存的过程中,系统的当前页表、进程页表、子进程页表、内核页表等之间的关系是怎样的?在进行内存复制时,需要注意哪些问题?

  3. 为什么在实验的实现中,fork 系统调用必须在任何 Rust 内存分配(堆内存分配)之前进行?如果在堆内存分配之后进行 fork,会有什么问题?

  4. 进行原子操作时候的 Ordering 参数是什么?此处 Rust 声明的内容与 C++20 规范 中的一致,尝试搜索并简单了解相关内容,简单介绍该枚举的每个值对应于什么含义。

  5. 在实现 SpinLock 的时候,为什么需要实现 Sync trait?类似的 Send trait 又是什么含义?

  6. core::hint::spin_loop 使用的 pause 指令和 Lab 4 中的 x86_64::instructions::hlt 指令有什么区别?这里为什么不能使用 hlt 指令?

加分项

  1. 🤔 尝试实现如下用户程序任务,完成用户程序 fish

    • 创建三个子进程,让它们分别能输出且只能输出 ><_
    • 使用学到的方法对这些子进程进行同步,使得打印出的序列总是 <><_><>_ 的组合。

    在完成这一任务的基础上,其他细节可以自行决定如何实现,包括输出长度等。

  2. 🤔 尝试和前文不同的其他方法解决哲学家就餐问题,并验证你的方法能够正确解决它,简要介绍你的方法,并给出程序代码和测试结果。

  3. 🔥 尝试使用符合 Rust 做法的方式处理互斥锁,使用 RAII 的方式来保证锁的释放:

    RAII(Resource Acquisition Is Initialization)是一种资源获取即初始化的技术,它通过在对象的构造函数中获取资源,然后在析构函数中释放资源的方法,来保证资源的正确释放。

    对于 Rust,也即实现 MutexGuard 类似的结构,它在构造时获取锁,然后在此结构体被移出作用域时释放锁。

    • acquire 时候返回 MutexGuard 对象。
    • 移除 release 函数,使用 MutexGuardDrop trait 来释放锁。

    本项实现难度较大,不建议初学者尝试。

    本加分项涉及到生命周期、unsafe 构造、mem::forgetDeref 等内容,需要对 Rust 的底层实现有一定的了解。

    作为本加分项的备选方案,可以尝试查看 spin crate 及其依赖的的源码,了解其实现方式,并进行一些描述、记录和学习。