当前位置: 首页 > ai >正文

Go语言实现生产者-消费者问题的多种方法

Go语言实现生产者-消费者问题的多种方法

生产者-消费者问题是并发编程中的经典问题,涉及多个生产者生成数据,多个消费者消费数据,二者通过缓冲区(队列)进行协调,保证数据的正确传递和同步。本文将从简单到复杂,使用不同的 Go 语言并发原语实现生产者-消费者模型,并详细介绍所用知识点。


目录

  1. 方法一:使用无缓冲 Channel(同步通信)
  2. 方法二:使用带缓冲 Channel(异步通信)
  3. 方法三:使用 sync.Mutex + 条件变量 sync.Cond 实现缓冲区
  4. 方法四:使用 Channel + select 实现多路复用和超时控制

方法一:使用无缓冲 Channel(同步通信)

知识点

  • 无缓冲 Channel:发送和接收必须同时准备好,适合严格同步的场景。
  • Goroutine:轻量级线程,使用 go 关键字启动。
  • sync.WaitGroup:等待所有 goroutine 完成。

代码示例

package mainimport ("fmt""sync""time"
)func producer(id int, ch chan<- int, wg *sync.WaitGroup) {defer wg.Done()for i := 0; i < 3; i++ {item := id*100 + ifmt.Printf("生产者 %d 生产了产品 %d\n", id, item)ch <- item                         // 发送数据,阻塞直到有消费者接收time.Sleep(100 * time.Millisecond) // 模拟生产时间}
}func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {defer wg.Done()for item := range ch {fmt.Printf("消费者 %d 消费了产品 %d\n", id, item)time.Sleep(150 * time.Millisecond) // 模拟消费时间}
}func main() {ch := make(chan int) // 无缓冲 channelvar wg sync.WaitGroup// 启动生产者for i := 1; i <= 2; i++ {wg.Add(1)go producer(i, ch, &wg)}// 启动消费者for i := 1; i <= 2; i++ {wg.Add(1)go consumer(i, ch, &wg)}// 等待生产者完成wg.Wait()// 关闭 channel,通知消费者结束close(ch)// 由于消费者在 range 中消费,关闭后会退出// 这里主 goroutine 退出,程序结束
}

说明

  • 生产者发送数据时会阻塞,直到消费者接收,保证同步。
  • 适合生产和消费速度相近的场景。
  • 关闭 channel 后,消费者会自动退出。

方法二:使用带缓冲 Channel(异步通信)

知识点

  • 带缓冲 Channel:允许生产者先发送一定数量数据,消费者稍后接收,提升并发效率。
  • 生产者和消费者速度不匹配时,缓冲区能暂存数据,减少阻塞。

代码示例

package mainimport ("fmt""math/rand""sync""time"
)func producer(id int, ch chan<- int, wg *sync.WaitGroup) {defer wg.Done()for i := 0; i < 5; i++ {item := id*100 + ifmt.Printf("生产者 %d 生产了产品 %d\n", id, item)ch <- itemtime.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)}
}func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {defer wg.Done()for item := range ch {fmt.Printf("消费者 %d 消费了产品 %d\n", id, item)time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)}
}func main() {rand.Seed(time.Now().UnixNano())ch := make(chan int, 3) // 带缓冲 channel,缓冲区大小为3var wgProducers sync.WaitGroupvar wgConsumers sync.WaitGroup// 启动生产者for i := 1; i <= 3; i++ {wgProducers.Add(1)go producer(i, ch, &wgProducers)}// 启动消费者for i := 1; i <= 2; i++ {wgConsumers.Add(1)go consumer(i, ch, &wgConsumers)}// 等待所有生产者完成wgProducers.Wait()// 关闭 channel,通知消费者没有更多数据close(ch)// 等待所有消费者完成wgConsumers.Wait()fmt.Println("所有生产者和消费者已完成工作,程序结束")
}

说明

  • 生产者可以先发送数据到缓冲区,不必等待消费者立即接收。
  • 缓冲区大小影响生产者和消费者的阻塞情况。
  • 关闭 channel 后,消费者会自动退出。

方法三:使用 sync.Mutex + sync.Cond 实现缓冲区(手动实现队列)

知识点

  • sync.Mutex:互斥锁,保护共享资源。
  • sync.Cond:条件变量,支持等待和通知机制。
  • 手动实现缓冲区:用切片模拟队列,生产者和消费者通过条件变量协调。

代码示例

package mainimport ("fmt""sync""time"
)type Buffer struct {items    []intsize     intlock     sync.MutexnotEmpty *sync.CondnotFull  *sync.Cond
}func NewBuffer(size int) *Buffer {b := &Buffer{items: make([]int, 0, size),size:  size,}b.notEmpty = sync.NewCond(&b.lock)b.notFull = sync.NewCond(&b.lock)return b
}func (b *Buffer) Put(item int) {b.lock.Lock()defer b.lock.Unlock()// 如果缓冲区满,等待 notFull 信号for len(b.items) == b.size {b.notFull.Wait()}b.items = append(b.items, item)fmt.Printf("生产了产品 %d,缓冲区大小: %d\n", item, len(b.items))// 通知消费者缓冲区非空b.notEmpty.Signal()
}func (b *Buffer) Get() int {b.lock.Lock()defer b.lock.Unlock()// 如果缓冲区空,等待 notEmpty 信号for len(b.items) == 0 {b.notEmpty.Wait()}item := b.items[0]b.items = b.items[1:]fmt.Printf("消费了产品 %d,缓冲区大小: %d\n", item, len(b.items))// 通知生产者缓冲区非满b.notFull.Signal()return item
}func producer(id int, b *Buffer, count int, wg *sync.WaitGroup) {defer wg.Done()for i := 0; i < count; i++ {item := id*100 + ib.Put(item)time.Sleep(100 * time.Millisecond)}
}func consumer(id int, b *Buffer, wg *sync.WaitGroup, done <-chan struct{}) {defer wg.Done()for {select {case <-done:returndefault:item := b.Get()time.Sleep(150 * time.Millisecond)fmt.Printf("消费者 %d 处理了产品 %d\n", id, item)}}
}func main() {bufferSize := 5b := NewBuffer(bufferSize)var wgProducers sync.WaitGroupvar wgConsumers sync.WaitGroupdone := make(chan struct{})// 启动生产者numProducers := 2produceCount := 10for i := 1; i <= numProducers; i++ {wgProducers.Add(1)go producer(i, b, produceCount, &wgProducers)}// 启动消费者numConsumers := 3for i := 1; i <= numConsumers; i++ {wgConsumers.Add(1)go consumer(i, b, &wgConsumers, done)}// 等待生产者完成wgProducers.Wait()// 生产结束,等待缓冲区清空for {b.lock.Lock()empty := len(b.items) == 0b.lock.Unlock()if empty {break}time.Sleep(100 * time.Millisecond)}// 通知消费者退出close(done)// 等待消费者退出wgConsumers.Wait()fmt.Println("所有生产者和消费者已完成工作,程序结束")
}

说明

  • 手动实现缓冲区,生产者和消费者通过条件变量等待和通知。
  • 适合需要自定义缓冲区行为的场景。
  • 需要额外处理消费者退出逻辑。

方法四:使用 Channel + select 实现多路复用和超时控制

知识点

  • select:Go 语言中用于监听多个 channel 的操作,支持超时和默认分支。
  • 超时控制:防止 goroutine 永久阻塞。
  • 多路复用:同时监听多个事件。

代码示例

package mainimport ("fmt""math/rand""time"
)func producer(id int, ch chan<- int, done <-chan struct{}) {for i := 0; i < 10; i++ {item := id*100 + iselect {case ch <- item:fmt.Printf("生产者 %d 生产了产品 %d\n", id, item)case <-done:fmt.Printf("生产者 %d 收到退出信号\n", id)return}time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)}
}func consumer(id int, ch <-chan int, done <-chan struct{}) {for {select {case item, ok := <-ch:if !ok {fmt.Printf("消费者 %d 发现通道关闭,退出\n", id)return}fmt.Printf("消费者 %d 消费了产品 %d\n", id, item)time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)case <-done:fmt.Printf("消费者 %d 收到退出信号\n", id)returncase <-time.After(2 * time.Second):fmt.Printf("消费者 %d 超时退出\n", id)return}}
}func main() {rand.Seed(time.Now().UnixNano())ch := make(chan int, 5)done := make(chan struct{})// 启动生产者for i := 1; i <= 3; i++ {go producer(i, ch, done)}// 启动消费者for i := 1; i <= 2; i++ {go consumer(i, ch, done)}// 运行一段时间后关闭生产者time.Sleep(5 * time.Second)close(done) // 通知所有 goroutine 退出// 关闭 channel,通知消费者没有更多数据close(ch)// 主 goroutine 等待一段时间让所有 goroutine 退出time.Sleep(3 * time.Second)fmt.Println("程序结束")
}

说明

  • 使用 select 监听多个 channel,支持超时和退出信号。
  • 生产者和消费者都能响应退出通知,优雅结束。
  • 适合复杂场景下的生产者-消费者模型。

总结

方法复杂度关键知识点适用场景
方法一简单无缓冲 channel,阻塞同步生产消费速度相近,简单同步
方法二中等带缓冲 channel,异步通信生产消费速度不匹配,提升效率
方法三较复杂sync.Mutex + sync.Cond,手动缓冲区需要自定义缓冲区行为,复杂同步
方法四复杂select 多路复用,超时控制,退出通知复杂场景,需多事件监听和优雅退出

Go 语言提供了丰富的并发原语,能够灵活实现生产者-消费者模型。根据实际需求和复杂度选择合适的方法,能让程序更高效、健壮。


http://www.xdnf.cn/news/6616.html

相关文章:

  • 【C++重载操作符与转换】句柄类与继承
  • 自定义CString类与MFC CString类接口对比
  • eSwitch manager 简介
  • InfluxDB 2.7 连续查询实战指南:Task 替代方案详解
  • python中元组的操作
  • 后端框架(2):Java的反射机制
  • 高效便捷的文字识别方案与解析
  • MATLAB中的概率分布生成:从理论到实践
  • 记录一次服务器卡顿
  • Redisson分布式锁-锁的可重入、可重试、WatchDog超时续约、multLock联锁(一文全讲透,超详细!!!)
  • SD框架下 LoRA 训练教程3-LORA学习率调度器(Learning Rate Scheduler)核心策略与实践指南
  • C++_STL_map与set
  • Java【13_1】final、初始化块、继承(测试题)
  • 每日Prompt:迷你 3D 建筑
  • pcie phy-电气层-gen1/2(TX)
  • C++ 条件变量与线程通知机制:std::condition_variable
  • PD 分离推理的加速大招,百度智能云网络基础设施和通信组件的优化实践
  • 【data】上海膜拜数据
  • AWS云入门宝典
  • STM32外设AD/DA-基础及CubeMX配置
  • Web性能优化的未来:边缘计算、AI与新型渲染架构
  • 排序01:多目标模型
  • SQL Server权限设置的几种方法
  • 每周靶点:CA125、AFP分享
  • Hue面试内容整理-示例编码题
  • 如何选择高性价比的 1T 服务器租用服务​
  • 【Android构建系统】了解Soong构建系统
  • JS手写代码篇---手写 instanceof 方法
  • AGI大模型(18):各大平台RAG实现之智普RAG
  • 达梦数据库多版本并发控制(MVCC)_yxy