当前位置: 首页 > backend >正文

Golang——8、协程和管道

协程和管道

  • 1、协程
    • 1.1、进程、线程和协程
    • 1.2、goroutine的使用以及sync.WaitGroup
    • 1.3、启动多个协程
    • 1.4、设置Golang并行运行的时候占用的cup数量
    • 1.5、goroutine统计素数
  • 2、管道
    • 2.1、管道的操作
    • 2.2、协程和管道协同
    • 2.3、单向管道
    • 2.4、多路复用之select
    • 2.5、解决协程中出现的异常问题
  • 3、Golang协程同步与互斥
    • 3.1、互斥锁
    • 3.2、读写锁
    • 3.3、条件变量

1、协程

1.1、进程、线程和协程

进程(Process)就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位,进程是一个动态概念,是程序在执行过程中分配和管理资源的基本单位,每一个进程都有一个自己的地址空间。一个进程至少有 5 种基本状态,它们是:初始态,执行态,等待状态,就绪状态,终止状态。

线程是进程的一个执行实例,是程序执行的最小单元,它是比进程更小的能独立运行的基本单位

并发:多个线程同时竞争一个位置,竞争到的才可以执行,每一个时间段只有一个线程在执行。
并行:多个线程可以同时执行,每一个时间段,可以有多个线程同时执行。

通俗的讲多线程程序在单核CPU上面运行就是并发,多线程程序在多核CUP上运行就是并行,如果线程数大于CPU核数,则多线程程序在多个CPU上面运行既有并行又有并发。

Golang中的协程:
Golang中的主线程:(可以理解为线程/也可以理解为进程),在一个Golang程序的主线程上可以起多个协程。Golang 中多协程可以实现并行或者并发。
协程:可以理解为用户级线程,这是对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的。Golang的一大特色就是从语言层面原生支持协程,在函数或者方法前面加go关键字就可创建一个协程。可以说Golang中的协程就是goroutine 。

Golang 中的多协程有点类似其他语言中的多线程。
多协程和多线程:Golang 中每个goroutine (协程) 默认占用内存远比Java 、C的线程少。
OS线程(操作系统线程)一般都有固定的栈内存(通常为 2MB 左右),一个goroutine (协程)占用内存非常小,只有 2KB 左右,多协程 goroutine切换调度开销方面远比线程要少。


1.2、goroutine的使用以及sync.WaitGroup

下面实现创建一个协程,在协程和主线程中分别执行打印语句,每次休眠一秒。

package mainimport ("fmt""time"
)func test() {for i := 0; i < 3; i++ {fmt.Println("test...")time.Sleep(time.Second)}
}func main() {go test()for i := 0; i < 3; i++ {fmt.Println("main...")time.Sleep(time.Second)}
}

在这里插入图片描述


但是有个问题,如果主线程执行的速度比较快呢,我们可以修改一下代码,让主线程跑快一些。
在这里插入图片描述
此时我们发现主线程执行完后,协程不会再继续执行了。这是因为主线程执行完后整个程序就退出了。
所以我们需要使用sync.WaitGroup来让主线程等待协程。

package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc test() {for i := 0; i < 3; i++ {fmt.Println("test...")time.Sleep(time.Second)}wg.Done()
}func main() {wg.Add(1)go test()for i := 0; i < 3; i++ {fmt.Println("main...")time.Sleep(100)}wg.Wait()
}

在这里插入图片描述
可以看到此时主线程执行完后会等待协程执行完,然后才会退出。有点类似于创建进程/线程并进行进程/线程等待回收。
其中:sync.WaitGroup本质上是一个计数器,Add方法表示增加计数器,Done表示让计数器减1,Wait表示等待协程执行完毕。


1.3、启动多个协程

package mainimport ("fmt""sync"
)var wg sync.WaitGroupfunc test(id int) {for i := 1; i <= 3; i++ {fmt.Printf("我是协程[%v]..., i=%d\n", id, i)}wg.Done()
}func main() {for i := 1; i <= 5; i++ {wg.Add(1)go test(i)}wg.Wait()
}

在这里插入图片描述


1.4、设置Golang并行运行的时候占用的cup数量

可以使用runtime.NumCPU()来获取当前计算机上CPU核心的数量。

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个 OS 线程来同时执行Go代码。默认值是机器上的 CPU 核心数。 例如在一个 8 核心的机器上,调度器会把 Go 代码同时调度到8个OS线程上。
Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

package mainimport ("fmt""runtime"
)func main() {num := runtime.NumCPU()fmt.Println("CPU数量为:", num)runtime.GOMAXPROCS(num - 1)
}

1.5、goroutine统计素数

现在假设要统计1->50000000中有多少素数,最普遍的做法是使用一个for循环来做,如下:

package mainimport ("fmt""time"
)func main() {u1 := time.Now().Unix()// var cnt = 0for i := 2; i <= 50000000; i++ {flag := truefor j := 2; j*j <= i; j++ {if i%j == 0 {flag = falsebreak}}if flag {// cnt++}}// fmt.Println("共有素数:", cnt)u2 := time.Now().Unix()fmt.Println("花费时间:", u2-u1)
}

在这里插入图片描述
我们发现运行时间高达12S。下面我们使用goroutine试试看:
我们创建5个协程来完成,每个协程处理一千万个数据。

package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc test(x int) {for i := (x-1)*10000000 + 1; i <= x*10000000; i++ {if i == 1 {continue}flag := truefor j := 2; j*j <= i; j++ {if i%j == 0 {flag = falsebreak}}if flag {}}wg.Done()
}func main() {u1 := time.Now().Unix()for i := 1; i <= 5; i++ {wg.Add(1)go test(i)}wg.Wait()u2 := time.Now().Unix()fmt.Println("花费时间:", u2-u1)
}

在这里插入图片描述

可以看到这里我们花费时间大大的降低了,那如果我们想实现几个协程判断素数,其中一个协程进行打印呢?就需要使用到下面的管道了。


2、管道

管道是Golang在语言级别上提供的goroutine间的通讯方式,我们可以使用channel在多个goroutine之间传递消息。如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Golang的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
Go语言中的管道(channel)是一种特殊的类型。管道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个管道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

2.1、管道的操作

1、channel类型。
在这里插入图片描述
2、创建管道需要使用make函数。
在这里插入图片描述

3、channel操作。
管道有发送(send)、接收(receive)和关闭(close)三种操作。其中发送和接收都需要使用<-符号,例子如下:

package mainimport "fmt"func main() {// 1.创建管道ch := make(chan int, 3)// 2.给管道发送数据ch <- 10ch <- 20ch <- 30// 3.从管道获取数据a := <-chfmt.Println(a)fmt.Printf("值: %v, 类型: %T, 长度: %d, 容量: %d\n", ch, ch, len(ch), cap(ch))
}

在这里插入图片描述
管道有点类似队列的结构特点,所以获取的a值为10。另外打印管道返回的是一个地址,容量为3,由于我们取出了一个数据,所以长度为2。

4、管道是引用数据类型。

package mainimport "fmt"func main() {ch := make(chan int, 3)ch <- 10ch <- 20ch2 := chch2 <- 30<-ch<-chfmt.Println(<-ch2)
}

在这里插入图片描述

5、管道阻塞
当管道中没有数据,再去取就会阻塞。当管道中数据写满了,再去取也会阻塞。

package mainfunc main() {ch := make(chan int, 3)ch <- 10ch <- 20ch <- 30ch <- 40
}

在这里插入图片描述

package mainimport "fmt"func main() {ch := make(chan int, 3)ch <- 10ch <- 20<-ch<-chnum := <-chfmt.Println(num)
}

在这里插入图片描述


6、for range遍历管道

package mainimport "fmt"func main() {var ch = make(chan int, 5)for i := 1; i <= 5; i++ {ch <- i}for v := range ch {fmt.Println(v)}
}

在这里插入图片描述

注意:for range遍历管道只有一个返回值,并且会报错。
解决办法:调用close函数关闭管道,这样for range遍历结束就会退出,不会报错。

package mainimport "fmt"func main() {var ch = make(chan int, 5)for i := 1; i <= 5; i++ {ch <- i}close(ch)for v := range ch {fmt.Println(v)}
}

在这里插入图片描述

另外还可以直接使用for循环遍历,不过需要知道管道中的元素个数。

package mainimport "fmt"func main() {var ch = make(chan int, 5)for i := 1; i <= 5; i++ {ch <- i}for i := 0; i < len(ch); i++ {fmt.Println(<-ch)}
}

在这里插入图片描述


2.2、协程和管道协同

7、需求:使用goroutine和channel协同工作。

  • 开启一个协程向管道中写入数据。
  • 开启一个协程丛管道中读取数据。
  • 主线程必须等协程操作完才能退出。
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc fn1(ch chan int) {for i := 1; i <= 5; i++ {ch <- ifmt.Println("协程[1]向管道中写入数据:", i)time.Sleep(500)}close(ch)wg.Done()
}func fn2(ch chan int) {for i := 1; i <= 5; i++ {x := <-chfmt.Println("协程[2]从管道中读取数据:", x)time.Sleep(500)}wg.Done()
}func main() {var ch = make(chan int, 5)wg.Add(2)go fn1(ch)go fn2(ch)wg.Wait()
}

在这里插入图片描述
这里先读取是因为协程1已经把数据写入了,只不过还没打印出来就被协程2取走并打印输出了。管道是自带同步和互斥机制的,所以哪怕让协程2休眠时间远短于协程1,协程2也会阻塞住等待。


再来看另外一个例子,使用go关键字配合匿名自执行函数创建协程。

package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc main() {var ch = make(chan int, 3)wg.Add(1)go func() {for i := 1; i <= 3; i++ {num := <-chfmt.Println(num)}wg.Done()}()wg.Add(1)go func() {for i := 1; i <= 3; i++ {time.Sleep(time.Second)ch <- i}wg.Done()}()wg.Wait()
}

在这里插入图片描述
这里有点类似于C++中通过lambda表达式创建线程执行。


需求:改善上面实现的素数判断,还是创建多个协程来判断素数,但是我们还要创建一个协程来打印素数,这就需要实现协程间通信,所以就需要使用协程+管道。

  • 创建一个管道intChain和一个协程,这个协程负责写入需要判断的值,然后判断素数的协程从管道intChain中获取数据进行判断。
  • 创建16个协程和一个管道primeChain,这16个协程从上面的管道intChain中获取数据进行判断,如果是素数就写入到新创建的管道primeChain中。
  • 创建一个打印素数的协程,该协程从存放素数的管道primeChain中获取数据打印输出。
  • 主线程进行等待

但是还要注意,我们打印素数协程是使用for range遍历管道的,所以需要close管道,而我们不能在执行方法中随意close管道,因为可能其他协程还要写入,所以还需要一个exitChain来标识,当判断素数的协程执行完就向exitChain写入true。然后我们另外创建一个协程来读取exitChain,当十六次全部读取完毕就可以关闭primeChain,这样for range就不会出错了。
实现代码如下:

package mainimport ("fmt""sync"
)var wg sync.WaitGroupfunc putNum(intChain chan int) {for i := 2; i <= 100; i++ {intChain <- i}close(intChain)wg.Done()
}func isPrime(intChain chan int, primeChain chan int, exitChain chan bool) {for v := range intChain {flag := truefor j := 2; j*j <= v; j++ {if v%j == 0 {flag = falsebreak}}if flag {primeChain <- v}}exitChain <- truewg.Done()
}func printPrime(primeChain chan int) {for v := range primeChain {fmt.Printf("%v是素数\n", v)}wg.Done()
}func main() {intChan := make(chan int, 1000)primeChan := make(chan int, 1000)exitChan := make(chan bool, 16)wg.Add(1)go putNum(intChan)for i := 0; i < 16; i++ {wg.Add(1)go isPrime(intChan, primeChan, exitChan)}wg.Add(1)go printPrime(primeChan)wg.Add(1)go func() {for i := 0; i < 16; i++ {<-exitChan}close(primeChan)wg.Done()}()wg.Wait()fmt.Println("执行完毕...")
}

在这里插入图片描述


2.3、单向管道

有的时候我们会将管道作为参数在多个任务函数间传递, 很多时候我们在不同的任务函数中使用管道都会对其进行限制,比如限制管道在函数中只能发送或只能接收。

单向管道的实现如下,在chan左边或右边添加<-。

// 声明为只写
var chan1 chan<- int
chan1 = make(chan int, 3)// 声明为只读
var chan2 <-chan int
chan2 = make(chan int, 3)

举个例子,创建两个协程实现一个协程向管道中写入数据,另一个协程从管道中读取数据。按之前的写法如下:

package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc fn1(ch chan int) {for i := 1; i <= 5; i++ {ch <- ifmt.Println("协程[1]向管道中写入:", i)time.Sleep(time.Millisecond * 100)}close(ch)wg.Done()
}func fn2(ch chan int) {for v := range ch {fmt.Println("协程[2]从管道中读取:", v)time.Sleep(time.Millisecond * 100)}wg.Done()
}func main() {var ch = make(chan int, 5)wg.Add(1)go fn1(ch)wg.Add(1)go fn2(ch)wg.Wait()
}

在这里插入图片描述

这么写其实也没有什么问题,但是我们可以在函数参数上进一步限制管道,对于fn1来说,该管道只进行写入,对于fn2来说,该管道只进行读取,所以可以修改成下面的代码:

package mainimport ("fmt""sync""time"
)var wg sync.WaitGroupfunc fn1(ch chan<- int) {for i := 1; i <= 5; i++ {ch <- ifmt.Println("协程[1]向管道中写入:", i)time.Sleep(time.Millisecond * 100)}close(ch)wg.Done()
}func fn2(ch <-chan int) {for v := range ch {fmt.Println("协程[2]从管道中读取:", v)time.Sleep(time.Millisecond * 100)}wg.Done()
}func main() {var ch = make(chan int, 5)wg.Add(1)go fn1(ch)wg.Add(1)go fn2(ch)wg.Wait()
}

2.4、多路复用之select

在某些场景下我们需要同时从多个管道中读取数据,这时候就可以使用多路复用技术。多路复用本质上是一种就绪事件的通知机制。

select的使用类似于switch语句,它有一系列case分支和一个默认的分支。每个case会对应一个管道的通信(接收或发送) 过程。select会一直等待,直到底层事件就绪时, 就会执行case分支对应的语句。 具体格式如下:
在这里插入图片描述
当读取完所有数据后就会走default。

例如下面读取两个管道中的数据,可以创建两个协程来读取,也可以使用多路复用。

package mainimport "fmt"func main() {intChan := make(chan int, 10)for i := 1; i <= 10; i++ {intChan <- i}strChan := make(chan string, 5)for i := 1; i <= 5; i++ {strChan <- fmt.Sprintf("hello-%d", i)}for {select {case v := <-intChan:fmt.Println("从intChan中获取数据:", v)case v := <-strChan:fmt.Println("从strChan中获取数据:", v)default:fmt.Println("数据获取完毕...")return}}
}

注意:
1、走到default表示管道中的数据都获取完毕了,由于外层是for死循环,所以需要return退出。
2、使用select来获取管道中的数据,不需要close管道。


2.5、解决协程中出现的异常问题

package mainimport ("fmt""time"
)func print() {for i := 0; i < 5; i++ {fmt.Println("hello...")}
}func test() {var m map[string]stringm["username"] = "张三"
}func main() {go print()go test()time.Sleep(time.Second)
}

在上面的test中,由于我们只是声明了m,没有使用make函数来创建空间,所以该协程出现异常导致整个程序崩溃,类似于C/C++中线程出现异常导致整个进程崩溃。
所以我们可以使用defer + recover来解决。

package mainimport ("fmt""time"
)func print() {for i := 0; i < 5; i++ {fmt.Println("hello...")}
}func test() {defer func() {err := recover()if err != nil {fmt.Println("err:", err)}}()var m map[string]stringm["username"] = "张三"
}func main() {go print()go test()time.Sleep(time.Second)
}

3、Golang协程同步与互斥

3.1、互斥锁

多协程访问共享资源不加以保护就会出问题,下面用多协程模拟一轮抢票。

package mainimport ("fmt""sync""time"
)var ticket = 10000
var wg sync.WaitGroupfunc GetTicket(i int) {for {if ticket > 0 {time.Sleep(time.Microsecond * 1000)fmt.Printf("协程[%d]抢到票: %d\n", i, ticket)ticket--} else {break}}wg.Done()
}func main() {for i := 1; i <= 4; i++ {wg.Add(1)go GetTicket(i)}wg.Wait()
}

在这里插入图片描述

多协程共享全局变量,在进行抢票的时候我们发现多个协程竟然抢到同一张票,所以我们需要加锁保护。Golang中的互斥量使用很简单,只需要在全局定义一个sync.Mutex对象,调用其中的Lock和Unlock方法即可。

package mainimport ("fmt""sync""time"
)var ticket = 10000
var wg sync.WaitGroup
var mutex sync.Mutexfunc GetTicket(i int) {for {mutex.Lock()if ticket > 0 {time.Sleep(time.Microsecond * 1000)fmt.Printf("协程[%d]抢到票: %d\n", i, ticket)ticket--} else {mutex.Unlock()break}mutex.Unlock()}wg.Done()
}func main() {for i := 1; i <= 4; i++ {wg.Add(1)go GetTicket(i)}wg.Wait()
}

在这里插入图片描述


3.2、读写锁

读写锁保证任何时刻只有读者或者只有写者,如果是写者只能有一个写者,如果是读者可以有多个读者。使用如下:

var rwMtx sync.RWMutex  // 定义读写锁
rwMtx.Lock()   //写者加锁
rwMtx.Unlock() //写者解锁
rwMtx.RLock() // 读者加锁
rwMtx.RUnlock() // 读者解锁

下面创建一个协程协程写入数据,另一批协程读取数据。

package mainimport ("fmt""sync"
)var wg sync.WaitGroup
var rwMtx sync.RWMutexfunc read() {rwMtx.RLock()fmt.Println("协程读取数据...")rwMtx.RUnlock()wg.Done()
}func write() {rwMtx.Lock()fmt.Println("协程写入数据...")rwMtx.Unlock()wg.Done()
}func main() {for i := 0; i < 10; i++ {wg.Add(1)go write()}for i := 0; i < 10; i++ {wg.Add(1)go read()}wg.Wait()
}

在这里插入图片描述


3.3、条件变量

条件变量是用来实现协程同步和互斥的。使用如下:

var mutex sync.Mutex
var cond = sync.NewCond(&mutex) // 传入锁初始化条件变量
cond.Wait()      // 等待条件变量
cond.Signal()    // 唤醒一个协程
cond.Broadcast() // 唤醒所有协程
cond.L.Lock()    // 加锁,本质使用的是传入的mutex锁
cond.L.Unlock()  // 解锁,本质使用的是传入的mutex锁

加锁可以直接使用条件变量提供的方法加锁,也可以使用我们定义的锁来加锁,但是要保证是同一把锁。

下面使用条件变量实现协程同步和互斥,需求:两个协程交替打印奇数和偶数。

package mainimport ("fmt""sync""time"
)var mutex sync.Mutex
var cond = sync.NewCond(&mutex)
var wg sync.WaitGroup
var x = 1
var flag = truefunc fn1() {for {cond.L.Lock()for !flag {cond.Wait()}fmt.Println("协程[1]打印数据:", x)x++flag = falsetime.Sleep(time.Second)cond.Signal()cond.L.Unlock()}wg.Done()
}func fn2() {for {cond.L.Lock()for flag {cond.Wait()}fmt.Println("协程[2]打印数据:", x)x++flag = truetime.Sleep(time.Second)cond.Signal()cond.L.Unlock()}wg.Done()
}func main() {wg.Add(1)go fn1()wg.Add(1)go fn2()wg.Wait()
}

在这里插入图片描述


由于管道自带同步互斥保护机制,所以也可以使用协程+管道来实现。

package mainimport ("fmt""sync""time"
)var x = 1
var wg sync.WaitGroupfunc fn1(ch1 <-chan bool, ch2 chan<- bool) {for {<-ch1fmt.Println("协程[1]打印数据:", x)time.Sleep(time.Second)x++ch2 <- true}wg.Done()
}func fn2(ch1 chan<- bool, ch2 <-chan bool) {for {<-ch2fmt.Println("协程[2]打印数据:", x)time.Sleep(time.Second)x++ch1 <- true}wg.Done()
}func main() {var ch1 = make(chan bool, 1)var ch2 = make(chan bool, 1)ch1 <- truewg.Add(1)go fn1(ch1, ch2)wg.Add(2)go fn2(ch1, ch2)wg.Wait()
}

在这里插入图片描述


http://www.xdnf.cn/news/12197.html

相关文章:

  • 更新Java的环境变量后VScode/cursor里面还是之前的环境变量
  • 【Go语言基础【5】】运算符基础
  • Kubernetes (k8s)版本发布情况
  • Java 依赖注入、控制反转与面向切面:面试深度解析
  • Deployment实现扩展/收缩,以及滚动更新
  • 数据结构第八章(二)-交换排序
  • dvwa14——JavaScript
  • 多层PCB技术解析:从材料选型到制造工艺的深度实践
  • Python 训练营打卡 Day 44
  • Linux下JSON序列化与反序列化方法
  • Python Day44
  • 数据可视化大屏案例落地实战指南:捷码平台7天交付方法论
  • 【达梦数据库】OOM问题排查思路
  • React 新项目
  • OGG-01635 OGG-15149 centos服务器远程抽取AIX oracle11.2.0.4版本
  • Spring框架学习day7--SpringWeb学习(概念与搭建配置)
  • Eureka REST 相关接口
  • 云原生思维重塑数字化基座:从理念到实践的深度剖析
  • Python基于蒙特卡罗方法实现投资组合风险管理的VaR与ES模型项目实战
  • Django CMS 的 Demo
  • 每日算法 -【Swift 算法】三数之和最接近目标值
  • Golang——9、反射和文件操作
  • Redis:介绍和认识,通用命令,数据类型和内部编码,单线程模型
  • 深入浅出玩转物联网时间同步:基于BC260Y的NTP实验与嵌入式仿真教学革命
  • 从《现实不似你所见》探寻与缘起性空的思想交织
  • MySQL间隙锁入手,拿下间隙锁面试与实操
  • [原创](现代Delphi 12指南):[macOS 64bit App开发]: TTask创建多线程, 更简单, 更快捷.
  • 报告精读:“数据银行”概念模型与建设规划研究报告【附全文阅读】
  • JavaSec-SSTI - 模板引擎注入
  • 【ArcGIS应用】ArcGIS‌应用如何进行影像分类?