第六章:并发编程¶
6.1 Goroutine¶
Goroutine 是 Go 语言中的轻量级线程,由 Go 运行时管理。
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello, Goroutine!")
}
func main() {
// 启动 goroutine
go sayHello()
// 匿名函数 goroutine
go func() {
fmt.Println("匿名 goroutine")
}()
// 带参数的 goroutine
go func(msg string) {
fmt.Println(msg)
}("带参数的消息")
// 等待 goroutine 执行
time.Sleep(time.Second)
fmt.Println("主函数结束")
}
Goroutine 特点¶
| 特点 | 说明 |
|---|---|
| 轻量 | 初始栈 2KB,可动态伸缩 |
| 低开销 | 创建成本远低于线程 |
| 调度 | 由 Go 运行时调度 |
| 数量 | 可创建数百万个 |
6.2 Channel¶
Channel 是 goroutine 之间的通信机制。
package main
import "fmt"
func main() {
// 创建 channel
ch := make(chan int)
// 带缓冲的 channel
bufferedCh := make(chan int, 3)
// 发送和接收
go func() {
ch <- 42 // 发送
}()
value := <-ch // 接收
fmt.Println(value)
// 带缓冲 channel
bufferedCh <- 1
bufferedCh <- 2
bufferedCh <- 3
// bufferedCh <- 4 // 会阻塞
fmt.Println(<-bufferedCh)
fmt.Println(<-bufferedCh)
// 关闭 channel
close(ch)
// 检查 channel 是否关闭
v, ok := <-ch
fmt.Println(v, ok) // 0 false
}
遍历 Channel¶
package main
import "fmt"
func main() {
ch := make(chan int, 5)
// 发送数据
go func() {
for i := 1; i <= 5; i++ {
ch <- i
}
close(ch)
}()
// 遍历 channel
for v := range ch {
fmt.Println(v)
}
}
6.3 Select¶
Select 用于处理多个 channel 操作。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "来自 ch1"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "来自 ch2"
}()
// select 多路复用
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
case <-time.After(300 * time.Millisecond):
fmt.Println("超时")
}
}
// 非阻塞 select
select {
case msg := <-ch1:
fmt.Println(msg)
default:
fmt.Println("没有数据")
}
}
6.4 并发模式¶
Worker Pool¶
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d 处理任务 %d\n", id, job)
time.Sleep(100 * time.Millisecond)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// 启动 worker
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, jobs, results)
}(i)
}
// 发送任务
for i := 1; i <= 10; i++ {
jobs <- i
}
close(jobs)
// 等待完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Println("结果:", result)
}
}
Pipeline¶
package main
import "fmt"
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// 创建 pipeline
nums := generator(1, 2, 3, 4, 5)
squares := square(nums)
// 消费结果
for result := range squares {
fmt.Println(result)
}
}
Fan-out/Fan-in¶
package main
import (
"fmt"
"sync"
)
func producer(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func processor(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// Fan-out: 多个 goroutine 从同一 channel 读取
in := producer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
c1 := processor(in)
c2 := processor(in)
c3 := processor(in)
// Fan-in: 合并多个 channel
for result := range merge(c1, c2, c3) {
fmt.Println(result)
}
}
6.5 同步原语¶
sync.WaitGroup¶
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Worker %d 完成\n", id)
}(i)
}
wg.Wait()
fmt.Println("所有 worker 完成")
}
sync.Mutex¶
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := &Counter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("计数器:", counter.Value())
}
sync.RWMutex¶
package main
import (
"fmt"
"sync"
)
type SafeMap struct {
mu sync.RWMutex
data map[string]int
}
func (m *SafeMap) Get(key string) (int, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
val, ok := m.data[key]
return val, ok
}
func (m *SafeMap) Set(key string, value int) {
m.mu.Lock()
defer m.mu.Unlock()
m.data[key] = value
}
func main() {
m := &SafeMap{data: make(map[string]int)}
var wg sync.WaitGroup
// 写操作
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
m.Set(fmt.Sprintf("key%d", i), i)
}(i)
}
// 读操作
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
if v, ok := m.Get(fmt.Sprintf("key%d", i)); ok {
fmt.Println(v)
}
}(i)
}
wg.Wait()
}
小结¶
- Goroutine 是轻量级线程,创建成本低
- Channel 是 goroutine 之间的通信机制
- Select 用于多路复用多个 channel
- Worker Pool 是常用的并发模式
- sync 包提供了互斥锁等同步原语