协程并发
协程并发
goroutine
介绍
在 Go 中,协程(goroutine)是一种轻量级的并发执行单位,允许你在同一个程序中同时执行多个函数,而无需显式地创建多个线程。协程是 Go 并发模型的核心组成部分,它们更加高效地利用了系统资源,使并发编程更加简洁和易于管理。
协程的特点和优势:
- 轻量级: 协程的创建和销毁开销很小,相比线程,可以创建更多的协程。
- 低成本: 在 Go 中,创建协程的成本远低于创建线程。
- 并发执行: 协程允许多个函数同时执行,但是它们在一个或多个线程上运行,由 Go 运行时调度管理。
- 通信通过共享内存: 协程之间通过共享内存来通信,但是 Go 也提供了通道(channel)作为一种更安全、更强大的通信机制。
- 高度抽象: Go 的并发模型使得编写并发程序变得更加简洁和直观。
案例
演示了如何使用协程并发地下载多个网页内容
package main
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
)
func fetchURL(url string, wg *sync.WaitGroup) {
defer wg.Done() // 通知 WaitGroup,协程已完成
resp, err := http.Get(url)
if err != nil {
fmt.Printf("Error fetching %s: %s\n", url, err)
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Printf("Error reading response body: %s\n", err)
return
}
fmt.Printf("Fetched %s, Length: %d\n", url, len(body))
}
func main() {
urls := []string{
"https://www.example.com",
"https://www.google.com",
"https://www.openai.com",
"https://www.wikipedia.org",
}
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1) // 增加 WaitGroup 计数器
go fetchURL(url, &wg) // 启动协程
}
wg.Wait() // 等待所有协程完成
fmt.Println("All downloads completed.")
}
在这个示例中,我们使用 sync.WaitGroup
来等待所有协程完成。遍历一组网址,并为每个网址启动一个协程。每个协程负责下载一个网页内容,并在完成时通过 Done()
方法通知 WaitGroup
。
channel
管道是Golang在语言级别上提供的goroutine间的通讯方式,我们可以使用channel在多个goroutine之间传递消息。如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
Golang的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。
Go语言中的管道(channel)是一种特殊的类型。管道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个管道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。
channel类型
channel是一种类型,一种引用类型。声明管道类型的格式如下:
// 声明一个传递整型的管道
var ch1 chan int
// 声明一个传递布尔类型的管道
var ch2 chan bool
// 声明一个传递int切片的管道
var ch3 chan []int
创建channel
声明管道后,需要使用make函数初始化之后才能使用
make(chan 元素类型, 容量)
举例如下:
// 创建一个能存储10个int类型的数据管道
ch1 = make(chan int, 10)
// 创建一个能存储4个bool类型的数据管道
ch2 = make(chan bool, 4)
// 创建一个能存储3个[]int切片类型的管道
ch3 = make(chan []int, 3)
channel操作
管道有发送,接收和关闭的三个功能
发送和接收 都使用 <- 符号
现在我们先使用以下语句定义一个管道:
ch := make(chan int, 3)
发送
将数据放到管道内,将一个值发送到管道内
// 把10发送到ch中
ch <- 10
取操作
x := <- ch
关闭管道.
通过调用内置的close函数来关闭管道
close(ch)
完整示例
// 创建管道
ch := make(chan int, 3)
// 给管道里面存储数据
ch <- 10
ch <- 21
ch <- 32
// 获取管道里面的内容
a := <- ch
fmt.Println("打印出管道的值:", a)
fmt.Println("打印出管道的值:", <- ch)
fmt.Println("打印出管道的值:", <- ch)
// 管道的值、容量、长度
fmt.Printf("地址:%v 容量:%v 长度:%v \n", ch, cap(ch), len(ch))
// 管道的类型
fmt.Printf("%T \n", ch)
// 管道阻塞(当没有数据的时候取,会出现阻塞,同时当管道满了,继续存也会)
<- ch // 没有数据取,出现阻塞
ch <- 10
ch <- 10
ch <- 10
ch <- 10 // 管道满了,继续存,也出现阻塞
通道的原理:
通道的底层实现使用了 Go 运行时的调度机制,它们是并发安全的数据结构。当协程发送数据到通道时,数据会被复制到通道中,而不是直接传递指针。当协程从通道接收数据时,数据会从通道中移出。Go 运行时确保在合适的时机执行数据的复制和移动,从而避免了竞争条件和并发冲突。
goroutine和channel
package main
import (
"fmt"
"math"
"sync"
)
func putNum(intChan chan int) {
for i := 2; i <= 120000; i++ {
intChan <- i
}
close(intChan)
}
func primeNum(intChan chan int, primeChan chan int) {
for value := range intChan {
var flag = true
for i := 2; i <= int(math.Sqrt(float64(value))); i++ {
if value%i == 0 {
flag = false
break
}
}
if flag {
primeChan <- value
}
}
}
func printPrime(primeChan chan int, wg *sync.WaitGroup) {
defer wg.Done()
for value := range primeChan {
fmt.Println(value)
}
}
var wg sync.WaitGroup
func main() {
intChan := make(chan int, 1000)
primeChan := make(chan int, 1000)
go putNum(intChan)
for i := 0; i < 10; i++ {
wg.Add(1)
go primeNum(intChan, primeChan)
}
wg.Add(1)
go printPrime(primeChan, &wg)
wg.Wait()
fmt.Println("主线程执行完毕")
}
单向管道
有时候我们会将管道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中,使用管道都会对其进行限制,比如限制管道在函数中只能发送或者只能接受
默认的管道是 可读可写
// 定义一种可读可写的管道
var ch = make(chan int, 2)
ch <- 10
<- ch
// 管道声明为只写管道,只能够写入,不能读
var ch2 = make(chan<- int, 2)
ch2 <- 10
// 声明一个只读管道
var ch3 = make(<-chan int, 2)
<- ch3
Select多路复用
在某些场景下我们需要同时从多个通道接收数据。这个时候就可以用到golang中给我们提供的select多路复用。 通常情况通道在接收数据时,如果没有数据可以接收将会发生阻塞。
select的使用类似于switch 语句,它有一系列case分支和一个默认的分支。每个case会对应一个管道的通信(接收或发送)过程。select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句。
select {
case <-channel1:
// 执行 channel1 接收操作
case channel2 <- value:
// 执行 channel2 发送操作
default:
// 当没有通道操作可以进行时执行的语句
}
使用select语法模拟一个网络监控程序:
package main
import (
"fmt"
"net/http"
"time"
)
func checkService(serviceName string, url string, statusChan chan<- string) {
for {
_, err := http.Get(url)
if err != nil {
statusChan <- fmt.Sprintf("[%s] is down", serviceName)
} else {
statusChan <- fmt.Sprintf("[%s] is up", serviceName)
}
time.Sleep(time.Second * 5)
}
}
func main() {
httpStatusChan := make(chan string)
dbStatusChan := make(chan string)
go checkService("HTTP Server", "http://localhost:8080", httpStatusChan)
go checkService("Database Server", "http://localhost:5432", dbStatusChan)
for {
select {
case httpStatus := <-httpStatusChan:
fmt.Println(httpStatus)
case dbStatus := <-dbStatusChan:
fmt.Println(dbStatus)
}
}
}
tip:使用select来获取数据的时候,不需要关闭chan,不然会出现问题
Go中的并发安全和锁
在 Go 语言中,并发锁用于控制对共享资源的访问,以避免多个 goroutine 同时修改共享资源而引发的数据竞争和不一致性问题。Go 提供了两种主要类型的并发锁:互斥锁(Mutex)和读写锁(RWMutex)。
互斥锁(Mutex):
互斥锁用于在同一时刻只允许一个 goroutine 访问被保护的资源。它实现了基本的互斥机制,确保任何时刻只有一个 goroutine 可以进入临界区。
工作原理:
当一个 goroutine 需要访问共享资源时,它会尝试获取互斥锁。如果锁当前未被其他 goroutine 持有,那么该 goroutine 将获得锁并可以安全地执行操作。如果锁已经被其他 goroutine 持有,那么当前 goroutine 将被阻塞,直到锁被释放为止。
示例:
下面是一个使用互斥锁的简单示例,展示了如何安全地对共享资源进行操作:
package main
import (
"fmt"
"sync"
"time"
)
var (
counter int
mutex sync.Mutex
)
func increment() {
mutex.Lock()
defer mutex.Unlock()
counter++
fmt.Println("Counter:", counter)
}
func main() {
for i := 0; i < 5; i++ {
go increment()
}
time.Sleep(time.Second)
}
在这个例子中,sync.Mutex
用于创建一个互斥锁。多个 goroutine 同时调用 increment
函数,但由于使用了互斥锁,每次只有一个 goroutine 能够获得锁并修改 counter
变量,从而避免了竞争条件。
读写锁(RWMutex):
读写锁允许多个 goroutine 同时读取共享资源,但只允许一个 goroutine 写入共享资源。这对于读操作远远超过写操作的情况非常有用,因为它可以提高并发性能。
工作原理:
读写锁维护了两个状态:读锁定和写锁定。多个 goroutine 可以同时获取读锁定,但只有一个 goroutine 可以获取写锁定。当写锁定被持有时,不允许获取读锁定,以避免写入和读取同时进行的情况。
示例:
下面是一个使用读写锁的示例,演示了读取和写入共享资源时的情况:
package main
import (
"fmt"
"sync"
"time"
)
var (
data map[string]string
rwMutex sync.RWMutex
)
func readData(key string) {
rwMutex.RLock()
defer rwMutex.RUnlock()
fmt.Println("Reading:", data[key])
}
func writeData(key, value string) {
rwMutex.Lock()
defer rwMutex.Unlock()
data[key] = value
fmt.Println("Writing:", key, value)
}
func main() {
data = make(map[string]string)
go writeData("key1", "value1")
for i := 0; i < 3; i++ {
go readData("key1")
}
time.Sleep(time.Second)
}
在这个例子中,多个 goroutine 同时读取 "key1" 的值,而在同一时刻只有一个 goroutine 写入数据。使用 sync.RWMutex
读写锁,我们可以实现读取的并发性,同时确保写操作的互斥性。