进程同步实验:生产者-消费者问题实现全解
在操作系统的并发编程中,生产者-消费者问题 是最经典的进程同步案例之一。这篇文章记录了我从零开始实现这个问题的完整过程,包括理论理解、关键设计决策和代码实现细节。
实验背景
当多个进程需要共享有限的资源时,就会产生 资源竞争 问题。在操作系统中,这种现象通常表现为:
- 间接制约:多个进程共享同一资源(如缓冲区),因系统需要协调使用资源而相互制约
- 直接制约:进程合作过程中,各自的执行结果互为对方的执行条件,从而限制执行速度
生产者-消费者问题正是这两种制约关系的完美体现。
实验要求
实验内容
使用多线程编程技术实现经典的生产者-消费者问题:
- 创建 3 个生产者线程和 2 个消费者线程
- 设计一个容量为 4 的有界缓冲区供生产者和消费者共享
- 使用互斥锁和条件变量实现对缓冲区的同步访问控制
- 生产者向缓冲区写入 24 个产品,消费者取出并消费
- 记录整个过程的日志,验证同步机制的正确性
实验目的
通过这个实验,我需要掌握:
- 同步与互斥的理论基础 —— 理解临界资源、互斥与同步的概念
- 同步原语的应用 —— 学会使用
mutex和condition_variable实现进程协调 - 并发问题的分析与解决 —— 识别并发冲突、设计同步方案
- 有界资源的管理 —— 理解有界缓冲区的流量控制机制
- 工程实践意识 —— 建立高质量并发程序的规范意识
关键概念解析
互斥(Mutual Exclusion)
互斥用于 保护临界资源 免受并发访问冲突。在本实验中:
- 每个缓冲区槽位都是临界资源
- 通过
slotLocked标记和互斥锁实现互斥 - 确保同一时刻只有一个线程能操作某个缓冲区
同步(Synchronization)
同步用于 协调进程的执行顺序与速度。关键点包括:
- 生产者需要等待有空的缓冲区才能写入(缓冲区满时阻塞)
- 消费者需要等待有数据的缓冲区才能读取(缓冲区空时阻塞)
- 通过条件变量实现高效的等待和唤醒
条件变量(Condition Variable)
条件变量是 C++ 标准库提供的同步机制:
condition_variable notFull; // 缓冲区非满的条件
condition_variable notEmpty; // 缓冲区非空的条件
// 生产者等待:缓冲区有空间时被唤醒
notFull.wait(lk, [] { return findUnlockedEmptySlotUnsafe() != -1; });
// 消费者等待:缓冲区有数据时被唤醒
notEmpty.wait(lk, [] { return findUnlockedFullSlotUnsafe() != -1; });
核心实现
数据结构定义
// 缓冲区容量和线程数量
static constexpr int BUFFER_SIZE = 4;
static constexpr int PRODUCER_COUNT = 3;
static constexpr int CONSUMER_COUNT = 2;
static constexpr int TOTAL_ITEMS = 24;
// 缓冲区数据和锁标记
int bufferSlots[BUFFER_SIZE] = {0, 0, 0, 0};
bool slotLocked[BUFFER_SIZE] = {false, false, false, false};
// 同步原语
mutex globalMutex;
condition_variable notFull;
condition_variable notEmpty;
// 生产消费计数
int producedCount = 0;
int consumedCount = 0;
关键函数
1. 查找空缓冲区
/**
* @brief 查找一个未加锁且为空的缓冲区槽位。
* @return 可用槽位下标;若没有则返回 -1。
* @note 需在持有 globalMutex 时调用。
*/
int findUnlockedEmptySlotUnsafe() {
for (int i = 0; i < BUFFER_SIZE; ++i) {
if (!slotLocked[i] && bufferSlots[i] == 0) {
return i;
}
}
return -1;
}
2. 查找满缓冲区
/**
* @brief 查找一个未加锁且已有资源的缓冲区槽位。
* @return 可消费槽位下标;若没有则返回 -1。
* @note 需在持有 globalMutex 时调用。
*/
int findUnlockedFullSlotUnsafe() {
for (int i = 0; i < BUFFER_SIZE; ++i) {
if (!slotLocked[i] && bufferSlots[i] == 1) {
return i;
}
}
return -1;
}
3. 生产者线程
/**
* @brief 生产者线程函数。
* @param producerId 生产者编号,用于日志区分。
*/
void producer(int producerId) {
// ... 随机数生成器初始化 ...
while (true) {
unique_lock<mutex> lk(globalMutex);
if (producedCount >= TOTAL_ITEMS) {
break;
}
int index = findUnlockedEmptySlotUnsafe();
if (index == -1) {
cout << "生产者 " << producerId << " 找不到空缓冲区,等待中..." << '\n';
// 没有空槽位时,等待消费者释放空间或生产已完成。
notFull.wait(lk, [] {
return producedCount >= TOTAL_ITEMS || findUnlockedEmptySlotUnsafe() != -1;
});
continue;
}
// 先锁定槽位,再写入,避免其他线程同时操作同一位置。
slotLocked[index] = true;
bufferSlots[index] = 1;
++producedCount;
cout << "生产者 " << producerId << " 生产1个资源 -> 缓冲区 " << index
<< ",总生产=" << producedCount << '\n';
printBufferUnsafe();
// 写入完成后解锁,允许消费者读取。
slotLocked[index] = false;
lk.unlock();
notEmpty.notify_one();
this_thread::sleep_for(chrono::milliseconds(sleepDist(rng)));
}
notEmpty.notify_all();
}
4. 消费者线程
/**
* @brief 消费者线程函数。
* @param consumerId 消费者编号,用于日志区分。
*/
void consumer(int consumerId) {
// ... 随机数生成器初始化 ...
while (true) {
unique_lock<mutex> lk(globalMutex);
if (consumedCount >= TOTAL_ITEMS) {
break;
}
int index = findUnlockedFullSlotUnsafe();
if (index == -1) {
if (producedCount >= TOTAL_ITEMS) {
bool hasFull = findUnlockedFullSlotUnsafe() != -1;
if (!hasFull) {
break;
}
}
cout << "消费者 " << consumerId << " 找不到满缓冲区,等待中..." << '\n';
// 没有可消费资源时,等待生产者投放或生产已结束。
notEmpty.wait(lk, [] {
if (findUnlockedFullSlotUnsafe() != -1) {
return true;
}
return producedCount >= TOTAL_ITEMS;
});
if (findUnlockedFullSlotUnsafe() == -1 && producedCount >= TOTAL_ITEMS) {
break;
}
continue;
}
// 先锁定槽位,再清空资源,避免并发读写冲突。
slotLocked[index] = true;
bufferSlots[index] = 0;
++consumedCount;
cout << "消费者 " << consumerId << " 消费1个资源 <- 缓冲区 " << index
<< ",总消费=" << consumedCount << '\n';
printBufferUnsafe();
// 消费完成后解锁,通知生产者可以继续投放。
slotLocked[index] = false;
lk.unlock();
notFull.notify_one();
this_thread::sleep_for(chrono::milliseconds(sleepDist(rng)));
}
notFull.notify_all();
}
设计要点与陷阱
1. 锁的原子性操作
问题:在判断缓冲区状态和上锁之间如果有间隔,就会产生竞态条件(race condition)。
解决:使用 unique_lock 持有互斥锁,确保整个判断和操作是原子的。
2. 条件变量的正确使用
问题:如果单纯忙等待(自旋),会浪费 CPU 资源。
解决:使用条件变量的 wait() 方法,线程会在条件不满足时阻塞,在条件满足时被唤醒。
3. 消费者的提前退出问题
问题:消费者可能在生产完成但仍有任务未消费时误判为完成而提前退出。
解决:
- 在等待条件中添加
producedCount >= TOTAL_ITEMS的判定 - 在消费前再次确认缓冲区状态
- 生产完成后通过
notify_all()唤醒所有消费者
4. 计数器管理
通过 producedCount 和 consumedCount 追踪生产消费进度,确保:
- 生产者知道何时停止生产
- 消费者知道何时停止消费
- 整个过程的数据完整性
实验收获
通过这个实验,我深刻理解了:
- 并发编程的复杂性 —— 看似简单的任务在多线程环境中会产生各种意想不到的问题
- 同步机制的必要性 —— 没有同步控制,共享资源无法被正确访问
- 标准库的强大 —— C++ 的
mutex和condition_variable提供了优雅的同步解决方案 - 调试能力的重要性 —— 多线程程序的时序问题往往难以复现,需要仔细的设计和测试
- 系统设计的价值 —— 良好的同步方案不仅能保证正确性,还能提高系统的吞吐量和响应速度
这个实验让我从理论走向实践,对操作系统中最核心的概念有了更深层次的认识。在未来的高并发编程中,这些知识将是不可或缺的基础。