On this page
article
2.Go的并发编程
简单介绍一下并发协程、管道、并发锁等
并发介绍
- 并发和并行
- 并发:本质还是串行
- 食堂窗口一个大妈(同一时间类只能给一个人打饭)
- Python 本质没有并行的线程
- 并行:任务分布在不同 CPU 上,同一时间点同时执行
- 并行就是有多个食堂大妈,同时给不同人打饭
- 并发:本质还是串行
- 协程和线程
- 协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的
- 线程:一个线程上可以跑多个协程,协程是轻量级的线程。
- 线程和协程最大的区别:
- 开启一个线程需要大概 2M 空间,而且需要 CPU 调度才能执行,线程会强占 CPU
- 开启一个协程大概只需要 2K 的空间,而且是由 Go 解释器自己实现的 GPM 调度,主动退出
- 可以同时启动成千上万个 goroutine 而不会过度占用内存
- 如果开启成千上万个线程,不仅会大量占用内存,甚至导致机器崩溃;操作系统调度线程也需要耗费大量时间
- 协程如果需要用 CPU 才会去使用 CPU,如果没有使用 CPU 的需求,它就会主动把 CPU 让给其他协程执行
- 线程在时间片内,即使不使用 CPU,比如当前正在从磁盘读数据,它也不会让出 CPU
goroutine
-
多线程编程缺点
- 在 Java/C++ 中我们要实现并发编程时,通常需要自己维护一个线程池,并且需要自己去包装一个又一个的任务,同时需要自己去调度线程执行任务并维护上下文切换
-
goroutine
- Go 语言中的 goroutine 是由 Go 的运行时(runtime)调度和管理的。
- Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。
- Go 语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。
- 在 Go 语言编程中,你不需要自己写进程、线程、协程,你的技能包里只有一个技能 —— goroutine
- 当你需要让某个任务并发执行时,只需将该任务包装成一个函数,并开启一个 goroutine 去执行该函数即可。
协程基本使用
-
启动一个协程
- 主线程中每 100 毫秒打印一次,总共打印 2 次,同时开启一个协程打印 10 次
package main
import (
"fmt"
"time"
)
func test() {
for i := 0; i < 10; i++ {
fmt.Println("test() 你好golang")
time.Sleep(time.Millisecond * 100)
}
}
func main() {
go test() // 表示开启一个协程
for i := 0; i < 2; i++ {
fmt.Println("main() 你好golang")
time.Sleep(time.Millisecond * 100)
}
}
-
WaitGroup
-
主线程退出后所有的协程无论有没有执行完毕都会退出。所以我们在主进程中可以通过
WaitGroup
等待协程执行完毕。 -
sync.WaitGroup
内部维护着一个计数器,计数器的值可以增加和减少。 -
例如当我们启动了 N 个并发任务时,就将计数器值增加 N。
-
每个任务完成时通过调用
Done()
方法将计数器减 1。 -
通过调用
Wait()
来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经完成。
-
var wg sync.WaitGroup // 第一步:定义一个计数器
wg.Add(1) // 第二步:开启一个协程计数器+1
wg.Done() // 第三步:协程执行完毕,计数器-1
wg.Wait() // 第四步:计数器为0时退出
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup // 第一步:定义一个计数器
func test1() {
for i := 0; i < 10; i++ {
fmt.Println("test1() 你好golang-", i)
time.Sleep(time.Millisecond * 100)
}
wg.Done() //协程计数器-1 // 第三步:协程执行完毕,计数器-1
}
func test2() {
for i := 0; i < 2; i++ {
fmt.Println("test2() 你好golang-", i)
time.Sleep(time.Millisecond * 100)
}
wg.Done() //协程计数器-1
}
func main() {
wg.Add(1) //协程计数器+1 第二步:开启一个协程计数器+1
go test1() // 表示开启一个协程
wg.Add(1) //协程计数器+1
go test2() // 表示开启一个协程
wg.Wait() //等待协程执行完毕 // 第四步:计数器为0时退出
fmt.Println("主线程退出")
}
-
开启多个协程
- 在 Go 语言中实现并发非常简单,可以轻松启动多个 goroutine,并使用
sync.WaitGroup
等待它们执行完毕。多次执行上面的代码,会发现每次打印的数字顺序都不一致。这是因为 10 个 goroutine 是并发执行的,而 goroutine 的调度是随机的。
- 在 Go 语言中实现并发非常简单,可以轻松启动多个 goroutine,并使用
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // goroutine结束就登记-1
fmt.Println("Hello Goroutine!", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello(i)
}
wg.Wait() // 等待所有登记的goroutine都结束
}
channel
-
Channel说明
-
共享内存交互数据弊端
- 单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行的意义。
- 虽然可以使用共享内存进行数据交换,但是共享内存在不同的 goroutine 中容易发生竞态问题。
- 为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
-
Channel好处
- Go 语言中的通道(channel)是一种特殊的类型。
- 通道像一个传送带或者队列,总是遵循先入先出(FIFO)的规则,保证收发数据的顺序。
- 每一个通道都是一个具体类型的导管,也就是声明 channel 的时候需要为其指定元素类型。
- 如果说 goroutine 是 Go 程序并发的执行体,channel 就是它们之间的连接。
- Channel 是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。
-
-
Channel类型
- Channel 是一种类型,一种引用类型。声明管道类型的格式如下:
var 变量 chan 元素类型
var ch1 chan int // 声明一个传递整型的管道
var ch2 chan bool // 声明一个传递布尔型的管道
var ch3 chan []int // 声明一个传递 int 切片的管道
-
创建channel
- 声明的管道后需要使用
make
函数初始化之后才能使用。创建 channel 的格式如下:make(chan 元素类型, 容量)
- 声明的管道后需要使用
// 创建一个能存储 10 个 int 类型数据的管道
ch1 := make(chan int, 10)
// 创建一个能存储 4 个 bool 类型数据的管道
ch2 := make(chan bool, 4)
// 创建一个能存储 3 个[]int 切片类型数据的管道
ch3 := make(chan []int, 3)
- Channel操作
package main
import "fmt"
func main() {
// 1、创建channel
ch := make(chan int, 5)
// 2、向channel放入数据
ch <- 10
ch <- 12
fmt.Println("发送成功", ch)
// 3、向channel取值
v1 := <-ch
fmt.Println(v1)
v2 := <-ch
fmt.Println(v2)
// 4、空channel取值报错
v3 := <-ch
fmt.Println("v3", v3)
}
-
优雅的从Channel取值
- 当通过通道发送有限的数据时,可以通过
close
函数关闭通道来告知从该通道接收值的 goroutine 停止等待。当通道被关闭时,往该通道发送值会引发 panic,从该通道里接收的值一直都是类型零值。可以通过for range
的方式判断通道是否关闭。
- 当通过通道发送有限的数据时,可以通过
package main
import "fmt"
func main() {
ch := make(chan int, 5)
ch <- 10
ch <- 12
ch <- 14
ch <- 16
ch <- 18
close(ch)
for i := range ch { // 通道关闭后会退出for range循环
fmt.Println(i)
}
}
-
Goroutine和Channel小案例
- 请完成 goroutine 和 channel 协同工作的案例,具体要求:
- 开启一个
writeData
协程,向管道intChan
中写入 50 个整数。 - 开启一个
readData
协程,从管道intChan
中读取writeData
写入的数据。 - 注意:
writeData
和readDate
操作的是同一个管道。 - 主线程需要等待
writeData
和readData
协程都完成工作才能退出。
- 开启一个
- 请完成 goroutine 和 channel 协同工作的案例,具体要求:
package main
import (
"fmt"
)
//write Data
func writeData(intChan chan int) {
for i := 1; i <= 50; i++ {
//放入数据
intChan <- i
fmt.Println("writeData ", i)
}
close(intChan) //关闭
}
//read data
func readData(intChan chan int, exitChan chan bool) {
for {
v, ok := <-intChan
if !ok {
break
}
fmt.Printf("readData 读到数据=%v\n", v)
}
//readData 读取完数据后,即任务完成
exitChan <- true
close(exitChan)
}
func main() {
//创建两个管道
intChan := make(chan int, 10)
exitChan := make(chan bool, 1)
go writeData(intChan)
go readData(intChan, exitChan)
for {
_, ok := <-exitChan
if !ok {
break
}
}
}
Select 多路复用
-
Select说明
- 传统的方法在遍历管道时,如果不关闭会阻塞而导致 deadlock,在实际开发中,可能我们不好确定什么时候关闭该管道。这种
for range
方式虽然可以实现从多个管道接收值的需求,但是运行性能会差很多。为了应对这种场景,Go 内置了select
关键字,可以同时响应多个管道的操作。 select
的使用类似于switch
语句,它有一系列case
分支和一个默认的分支。每个case
会对应一个管道的通信(接收或发送)过程。select
会一直等待,直到某个case
的通信操作完成时,就会执行case
分支对应的语句。- 具体格式如下:
- 传统的方法在遍历管道时,如果不关闭会阻塞而导致 deadlock,在实际开发中,可能我们不好确定什么时候关闭该管道。这种
select {
case v := <-chan1:
// 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
// 如果成功向chan2写入数据,则进行该case处理语句
default:
// 如果上面都没有成功,则进入default处理流程
}
-
Select的使用
- 使用
select
语句能提高代码的可读性。可处理一个或多个 channel 的发送/接收操作。如果多个case
同时满足,select
会随机选择一个。对于没有case
的select{}
会一直等待,可用于阻塞main
函数。
- 使用
package main
import (
"fmt"
)
func main() {
// 在某些场景下我们需要同时从多个通道接收数据,这个时候就可以用到golang中给我们提供的select多路复用
//1.定义一个管道 10个数据int
intChan := make(chan int, 10)
for i := 0; i < 10; i++ {
intChan <- i
}
//2.定义一个管道 5个数据string
stringChan := make(chan string, 5)
for i := 0; i < 5; i++ {
stringChan <- "hello" + fmt.Sprintf("%d", i)
}
// 使用select来获取channel里面的数据的时候不需要关闭channel
for {
select {
case v := <-intChan:
fmt.Printf("从 intChan 读取的数据%d\n", v)
case v := <-stringChan:
fmt.Printf("从 stringChan 读取的数据%v\n", v)
default:
fmt.Printf("数据获取完毕")
return // 注意退出
}
}
}
互斥锁
- 互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个 goroutine 可以访问共享资源。
- Go 语言中使用
sync
包的Mutex
类型来实现互斥锁。- 在多核的系统中,这 1000 个协程被分配到多个线程里面运行,那么就有可能是并行运行的。
- 比如当前的
num=987
,后面两个线程同时执行,那么结果就是988
,但是这不是我们想要的结果,我们想要的是999
。 - 如何解决?其实可以通过互斥锁来保证代码在并发运行时,只有一个线程可以修改
num
的值。
package main
import (
"fmt"
"sync"
)
var num int
var mtx sync.Mutex
var wg sync.WaitGroup
func add() {
defer wg.Done()
mtx.Lock()
num += 1
mtx.Unlock()
}
func main() {
for i := 0; i < 1000; i++ {
wg.Add(1)
go add()
}
wg.Wait()
fmt.Println("num:", num)
}
Semaphore信号量
- 含义
- Semaphore是用来控制多个goroutine同时访问多个资源的并发原语
- 包含2个操作:
- P 操作 (decrease, wait, acquire) 是减少信号量的计数值
- V 操作 (increase, signal, release) 是增加信号量的计数值
- 补充说明 P/V 的含义:
- P —— passeren,中文译为“通过”
- V —— vrijgeven,中文译为“释放”(因为作者是荷兰人,上面单词为荷兰语)
- 实现
- 类型名叫Weighted
- Acquire 方法:相当于P操作,你可以一次获取多个资源,如果没有足够的资源,那么调用者就会被阻塞
- Release 方法:相当于V操作,可以将多个资源释放,放宽信号量
- TryAcquire 方法:尝试获取N个资源,但是它不会阻塞,返回true,要么什么也不获取,返回false
- 类型名叫Weighted
- 使用场景
- 常用于控制并发
- 常见错误
- 请求了资源,但是忘记释放它
- 积放了从未请求的资源
- 长时间持有一个资源,即使不需要它
- 不持有一个资源,想直接使用它
- 避免误用
- 请求多少资源,就释放多少资源
package main
import (
"context"
"fmt"
"log"
"runtime"
"time"
"golang.org/x/sync/semaphore"
)
var (
maxWorkers = runtime.GOMAXPROCS(0)
//设置信号量总数 当前应为4
sema = semaphore.NewWeighted(int64(maxWorkers))
//设置需要完成的任务数 当前应为16
task = make([]int, maxWorkers*4)
)
func main() {
ctx := context.Background()
for i := range task {
//获取信号量,如果没有则阻塞。
log.Printf("阻塞中任务id: %v", i)
//P操作 将sema信号量减1
if err := sema.Acquire(ctx, 1); err != nil {
log.Panicf("减少信号量报错:%v", err)
}
log.Printf("阻塞结束任务id: %v", i)
go func(i int) {
//V操作 将sema信号量释放 加1
defer sema.Release(1)
time.Sleep(10000 * time.Millisecond)
task[i] = i + 1
}(i)
}
if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("获取所有的worker失败: %v", err)
}
fmt.Println(task)
}
/* 输出
2024/08/29 11:41:40 阻塞中任务id: 0
2024/08/29 11:41:40 阻塞结束任务id: 0
2024/08/29 11:41:40 阻塞中任务id: 1
2024/08/29 11:41:40 阻塞结束任务id: 1
2024/08/29 11:41:40 阻塞中任务id: 2
2024/08/29 11:41:40 阻塞结束任务id: 2
2024/08/29 11:41:40 阻塞中任务id: 3
2024/08/29 11:41:40 阻塞结束任务id: 3
2024/08/29 11:41:40 阻塞中任务id: 4
2024/08/29 11:41:50 阻塞结束任务id: 4
2024/08/29 11:41:50 阻塞中任务id: 5
2024/08/29 11:41:50 阻塞结束任务id: 5
2024/08/29 11:41:50 阻塞中任务id: 6
2024/08/29 11:41:50 阻塞结束任务id: 6
2024/08/29 11:41:50 阻塞中任务id: 7
2024/08/29 11:41:50 阻塞结束任务id: 7
2024/08/29 11:41:50 阻塞中任务id: 8
2024/08/29 11:42:00 阻塞结束任务id: 8
2024/08/29 11:42:00 阻塞中任务id: 9
2024/08/29 11:42:00 阻塞结束任务id: 9
2024/08/29 11:42:00 阻塞中任务id: 10
2024/08/29 11:42:00 阻塞结束任务id: 10
2024/08/29 11:42:00 阻塞中任务id: 11
2024/08/29 11:42:00 阻塞结束任务id: 11
2024/08/29 11:42:00 阻塞中任务id: 12
2024/08/29 11:42:10 阻塞结束任务id: 12
2024/08/29 11:42:10 阻塞中任务id: 13
2024/08/29 11:42:10 阻塞结束任务id: 13
2024/08/29 11:42:10 阻塞中任务id: 14
2024/08/29 11:42:10 阻塞结束任务id: 14
2024/08/29 11:42:10 阻塞中任务id: 15
2024/08/29 11:42:10 阻塞结束任务id: 15
[1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16]
*/