并发介绍

  1. 并发和并行
    • 并发:本质还是串行
      • 食堂窗口一个大妈(同一时间类只能给一个人打饭)
      • Python 本质没有并行的线程
    • 并行:任务分布在不同 CPU 上,同一时间点同时执行
      • 并行就是有多个食堂大妈,同时给不同人打饭

  1. 协程和线程
    • 协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的
    • 线程:一个线程上可以跑多个协程,协程是轻量级的线程。
    • 线程和协程最大的区别
      • 开启一个线程需要大概 2M 空间,而且需要 CPU 调度才能执行,线程会强占 CPU
      • 开启一个协程大概只需要 2K 的空间,而且是由 Go 解释器自己实现的 GPM 调度,主动退出
      • 可以同时启动成千上万个 goroutine 而不会过度占用内存
      • 如果开启成千上万个线程,不仅会大量占用内存,甚至导致机器崩溃;操作系统调度线程也需要耗费大量时间
      • 协程如果需要用 CPU 才会去使用 CPU,如果没有使用 CPU 的需求,它就会主动把 CPU 让给其他协程执行
      • 线程在时间片内,即使不使用 CPU,比如当前正在从磁盘读数据,它也不会让出 CPU

goroutine

  1. 多线程编程缺点

    • 在 Java/C++ 中我们要实现并发编程时,通常需要自己维护一个线程池,并且需要自己去包装一个又一个的任务,同时需要自己去调度线程执行任务并维护上下文切换
  2. goroutine

    • Go 语言中的 goroutine 是由 Go 的运行时(runtime)调度和管理的。
    • Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。
    • Go 语言之所以被称为现代化的编程语言,就是因为它在语言层面已经内置了调度和上下文切换的机制。
    • 在 Go 语言编程中,你不需要自己写进程、线程、协程,你的技能包里只有一个技能 —— goroutine
    • 当你需要让某个任务并发执行时,只需将该任务包装成一个函数,并开启一个 goroutine 去执行该函数即可。

协程基本使用

  1. 启动一个协程

    • 主线程中每 100 毫秒打印一次,总共打印 2 次,同时开启一个协程打印 10 次
package main
import (
    "fmt"
    "time"
)

func test() {
    for i := 0; i < 10; i++ {
        fmt.Println("test() 你好golang")
        time.Sleep(time.Millisecond * 100)
    }
}

func main() {
    go test() // 表示开启一个协程
    for i := 0; i < 2; i++ {
        fmt.Println("main() 你好golang")
        time.Sleep(time.Millisecond * 100)
    }
}
  1. WaitGroup

    • 主线程退出后所有的协程无论有没有执行完毕都会退出。所以我们在主进程中可以通过 WaitGroup 等待协程执行完毕。

    • sync.WaitGroup 内部维护着一个计数器,计数器的值可以增加和减少。

    • 例如当我们启动了 N 个并发任务时,就将计数器值增加 N。

    • 每个任务完成时通过调用 Done() 方法将计数器减 1。

    • 通过调用 Wait() 来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经完成。

var wg sync.WaitGroup    // 第一步:定义一个计数器
wg.Add(1)                // 第二步:开启一个协程计数器+1
wg.Done()                // 第三步:协程执行完毕,计数器-1
wg.Wait()                // 第四步:计数器为0时退出

package main
import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup    // 第一步:定义一个计数器
func test1() {
    for i := 0; i < 10; i++ {
        fmt.Println("test1() 你好golang-", i)
        time.Sleep(time.Millisecond * 100)
    }
    wg.Done() //协程计数器-1  // 第三步:协程执行完毕,计数器-1
}
func test2() {
    for i := 0; i < 2; i++ {
        fmt.Println("test2() 你好golang-", i)
        time.Sleep(time.Millisecond * 100)
    }
    wg.Done() //协程计数器-1
}
func main() {
    wg.Add(1) //协程计数器+1    第二步:开启一个协程计数器+1
    go test1() // 表示开启一个协程
    wg.Add(1) //协程计数器+1
    go test2() // 表示开启一个协程
    wg.Wait() //等待协程执行完毕 // 第四步:计数器为0时退出

    fmt.Println("主线程退出")
}
  1. 开启多个协程

    • 在 Go 语言中实现并发非常简单,可以轻松启动多个 goroutine,并使用 sync.WaitGroup 等待它们执行完毕。多次执行上面的代码,会发现每次打印的数字顺序都不一致。这是因为 10 个 goroutine 是并发执行的,而 goroutine 的调度是随机的。
package main
import (
    "fmt"
    "sync"
)

var wg sync.WaitGroup

func hello(i int) {
    defer wg.Done() // goroutine结束就登记-1
    fmt.Println("Hello Goroutine!", i)
}

func main() {
    for i := 0; i < 10; i++ {
        wg.Add(1) // 启动一个goroutine就登记+1
        go hello(i)
    }
    wg.Wait() // 等待所有登记的goroutine都结束
}

channel

  1. Channel说明

    • 共享内存交互数据弊端

      • 单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行的意义。
      • 虽然可以使用共享内存进行数据交换,但是共享内存在不同的 goroutine 中容易发生竞态问题。
      • 为了保证数据交换的正确性,必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。
    • Channel好处

      • Go 语言中的通道(channel)是一种特殊的类型。
      • 通道像一个传送带或者队列,总是遵循先入先出(FIFO)的规则,保证收发数据的顺序。
      • 每一个通道都是一个具体类型的导管,也就是声明 channel 的时候需要为其指定元素类型。
      • 如果说 goroutine 是 Go 程序并发的执行体,channel 就是它们之间的连接。
      • Channel 是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。
  2. Channel类型

    • Channel 是一种类型,一种引用类型。声明管道类型的格式如下:
var 变量 chan 元素类型
var ch1 chan int   // 声明一个传递整型的管道
var ch2 chan bool  // 声明一个传递布尔型的管道
var ch3 chan []int // 声明一个传递 int 切片的管道
  1. 创建channel

    • 声明的管道后需要使用 make 函数初始化之后才能使用。创建 channel 的格式如下: make(chan 元素类型, 容量)
// 创建一个能存储 10 个 int 类型数据的管道
ch1 := make(chan int, 10)
// 创建一个能存储 4 个 bool 类型数据的管道
ch2 := make(chan bool, 4)
// 创建一个能存储 3 个[]int 切片类型数据的管道
ch3 := make(chan []int, 3)
  1. Channel操作
package main
import "fmt"

func main() {
    // 1、创建channel
    ch := make(chan int, 5)
    // 2、向channel放入数据
    ch <- 10
    ch <- 12
    fmt.Println("发送成功", ch)
    // 3、向channel取值
    v1 := <-ch
    fmt.Println(v1)
    v2 := <-ch
    fmt.Println(v2)
    // 4、空channel取值报错
    v3 := <-ch
    fmt.Println("v3", v3)
}
  1. 优雅的从Channel取值

    • 当通过通道发送有限的数据时,可以通过 close 函数关闭通道来告知从该通道接收值的 goroutine 停止等待。当通道被关闭时,往该通道发送值会引发 panic,从该通道里接收的值一直都是类型零值。可以通过 for range 的方式判断通道是否关闭。
package main
import "fmt"

func main() {
    ch := make(chan int, 5)
    ch <- 10
    ch <- 12
    ch <- 14
    ch <- 16
    ch <- 18
    close(ch)
    for i := range ch { // 通道关闭后会退出for range循环
        fmt.Println(i)
    }
}
  1. Goroutine和Channel小案例

    • 请完成 goroutine 和 channel 协同工作的案例,具体要求:
      • 开启一个 writeData 协程,向管道 intChan 中写入 50 个整数。
      • 开启一个 readData 协程,从管道 intChan 中读取 writeData 写入的数据。
      • 注意:writeDatareadDate 操作的是同一个管道。
      • 主线程需要等待 writeDatareadData 协程都完成工作才能退出。

package main
import (
    "fmt"
)

//write Data
func writeData(intChan chan int) {
    for i := 1; i <= 50; i++ {
        //放入数据
        intChan <- i
        fmt.Println("writeData ", i)
    }
    close(intChan) //关闭
}

//read data
func readData(intChan chan int, exitChan chan bool) {
    for {
        v, ok := <-intChan
        if !ok {
            break
        }
        fmt.Printf("readData 读到数据=%v\n", v)
    }
    //readData 读取完数据后,即任务完成
    exitChan <- true
    close(exitChan)
}

func main() {
    //创建两个管道
    intChan := make(chan int, 10)
    exitChan := make(chan bool, 1)

    go writeData(intChan)
    go readData(intChan, exitChan)

    for {
        _, ok := <-exitChan
        if !ok {
            break
        }
    }
}

Select 多路复用

  1. Select说明

    • 传统的方法在遍历管道时,如果不关闭会阻塞而导致 deadlock,在实际开发中,可能我们不好确定什么时候关闭该管道。这种 for range 方式虽然可以实现从多个管道接收值的需求,但是运行性能会差很多。为了应对这种场景,Go 内置了 select 关键字,可以同时响应多个管道的操作。
    • select 的使用类似于 switch 语句,它有一系列 case 分支和一个默认的分支。每个 case 会对应一个管道的通信(接收或发送)过程。select 会一直等待,直到某个 case 的通信操作完成时,就会执行 case 分支对应的语句。
    • 具体格式如下:
select {
case v := <-chan1:
    // 如果chan1成功读到数据,则进行该case处理语句
case chan2 <- 1:
    // 如果成功向chan2写入数据,则进行该case处理语句
default:
    // 如果上面都没有成功,则进入default处理流程
}
    
  1. Select的使用

    • 使用 select 语句能提高代码的可读性。可处理一个或多个 channel 的发送/接收操作。如果多个 case 同时满足,select 会随机选择一个。对于没有 caseselect{} 会一直等待,可用于阻塞 main 函数。
package main

import (
    "fmt"
)

func main() {
    // 在某些场景下我们需要同时从多个通道接收数据,这个时候就可以用到golang中给我们提供的select多路复用

    //1.定义一个管道 10个数据int
    intChan := make(chan int, 10)
    for i := 0; i < 10; i++ {
        intChan <- i
    }
    //2.定义一个管道 5个数据string
    stringChan := make(chan string, 5)
    for i := 0; i < 5; i++ {
        stringChan <- "hello" + fmt.Sprintf("%d", i)
    }
    // 使用select来获取channel里面的数据的时候不需要关闭channel
    for {
        select {
        case v := <-intChan:
            fmt.Printf("从 intChan 读取的数据%d\n", v)
        case v := <-stringChan:
            fmt.Printf("从 stringChan 读取的数据%v\n", v)
        default:
            fmt.Printf("数据获取完毕")
            return // 注意退出
        }
    }
}

互斥锁

  • 互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个 goroutine 可以访问共享资源。

  • Go 语言中使用 sync 包的 Mutex 类型来实现互斥锁。
    • 在多核的系统中,这 1000 个协程被分配到多个线程里面运行,那么就有可能是并行运行的。
    • 比如当前的 num=987,后面两个线程同时执行,那么结果就是 988,但是这不是我们想要的结果,我们想要的是 999
    • 如何解决?其实可以通过互斥锁来保证代码在并发运行时,只有一个线程可以修改 num 的值。
package main

import (
    "fmt"
    "sync"
)

var num int

var mtx sync.Mutex
var wg sync.WaitGroup

func add() {
    defer wg.Done()
    mtx.Lock()
    num += 1
    mtx.Unlock()
}

func main() {
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go add()
    }
    wg.Wait()
    fmt.Println("num:", num)
}

Semaphore信号量

  • 含义
    • Semaphore是用来控制多个goroutine同时访问多个资源的并发原语
    • 包含2个操作:
      • P 操作 (decrease, wait, acquire) 是减少信号量的计数值
      • V 操作 (increase, signal, release) 是增加信号量的计数值
    • 补充说明 P/V 的含义:
      • P —— passeren,中文译为“通过”
      • V —— vrijgeven,中文译为“释放”(因为作者是荷兰人,上面单词为荷兰语)
  • 实现
    • 类型名叫Weighted
      • Acquire 方法:相当于P操作,你可以一次获取多个资源,如果没有足够的资源,那么调用者就会被阻塞
      • Release 方法:相当于V操作,可以将多个资源释放,放宽信号量
      • TryAcquire 方法:尝试获取N个资源,但是它不会阻塞,返回true,要么什么也不获取,返回false
  • 使用场景
    • 常用于控制并发
    • 常见错误
      • 请求了资源,但是忘记释放它
      • 积放了从未请求的资源
      • 长时间持有一个资源,即使不需要它
      • 不持有一个资源,想直接使用它
    • 避免误用
      • 请求多少资源,就释放多少资源

package main

import (
	"context"
	"fmt"
	"log"
	"runtime"
	"time"

	"golang.org/x/sync/semaphore"
)

var (
	maxWorkers = runtime.GOMAXPROCS(0)
	//设置信号量总数 当前应为4
	sema = semaphore.NewWeighted(int64(maxWorkers))
	//设置需要完成的任务数 当前应为16
	task = make([]int, maxWorkers*4)
)

func main() {
	ctx := context.Background()
	for i := range task {
		//获取信号量,如果没有则阻塞。
		log.Printf("阻塞中任务id: %v", i)
		//P操作 将sema信号量减1
		if err := sema.Acquire(ctx, 1); err != nil {
			log.Panicf("减少信号量报错:%v", err)
		}
		log.Printf("阻塞结束任务id: %v", i)

		go func(i int) {
			//V操作 将sema信号量释放 加1
			defer sema.Release(1)
			time.Sleep(10000 * time.Millisecond)
			task[i] = i + 1
		}(i)
	}

	if err := sema.Acquire(ctx, int64(maxWorkers)); err != nil {
		log.Printf("获取所有的worker失败: %v", err)
	}
	fmt.Println(task)
}


/* 输出

2024/08/29 11:41:40 阻塞中任务id: 0
2024/08/29 11:41:40 阻塞结束任务id: 0
2024/08/29 11:41:40 阻塞中任务id: 1
2024/08/29 11:41:40 阻塞结束任务id: 1
2024/08/29 11:41:40 阻塞中任务id: 2
2024/08/29 11:41:40 阻塞结束任务id: 2
2024/08/29 11:41:40 阻塞中任务id: 3
2024/08/29 11:41:40 阻塞结束任务id: 3
2024/08/29 11:41:40 阻塞中任务id: 4
2024/08/29 11:41:50 阻塞结束任务id: 4
2024/08/29 11:41:50 阻塞中任务id: 5
2024/08/29 11:41:50 阻塞结束任务id: 5
2024/08/29 11:41:50 阻塞中任务id: 6
2024/08/29 11:41:50 阻塞结束任务id: 6
2024/08/29 11:41:50 阻塞中任务id: 7
2024/08/29 11:41:50 阻塞结束任务id: 7
2024/08/29 11:41:50 阻塞中任务id: 8
2024/08/29 11:42:00 阻塞结束任务id: 8
2024/08/29 11:42:00 阻塞中任务id: 9
2024/08/29 11:42:00 阻塞结束任务id: 9
2024/08/29 11:42:00 阻塞中任务id: 10
2024/08/29 11:42:00 阻塞结束任务id: 10
2024/08/29 11:42:00 阻塞中任务id: 11
2024/08/29 11:42:00 阻塞结束任务id: 11
2024/08/29 11:42:00 阻塞中任务id: 12
2024/08/29 11:42:10 阻塞结束任务id: 12
2024/08/29 11:42:10 阻塞中任务id: 13
2024/08/29 11:42:10 阻塞结束任务id: 13
2024/08/29 11:42:10 阻塞中任务id: 14
2024/08/29 11:42:10 阻塞结束任务id: 14
2024/08/29 11:42:10 阻塞中任务id: 15
2024/08/29 11:42:10 阻塞结束任务id: 15
[1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16]

*/
上面的demo中通过日志时间可以看到,每次只并发了4个协程