跳转至

第六章:并发编程

6.1 Goroutine

Goroutine 是 Go 语言中的轻量级线程,由 Go 运行时管理。

package main

import (
    "fmt"
    "time"
)

func sayHello() {
    fmt.Println("Hello, Goroutine!")
}

func main() {
    // 启动 goroutine
    go sayHello()

    // 匿名函数 goroutine
    go func() {
        fmt.Println("匿名 goroutine")
    }()

    // 带参数的 goroutine
    go func(msg string) {
        fmt.Println(msg)
    }("带参数的消息")

    // 等待 goroutine 执行
    time.Sleep(time.Second)
    fmt.Println("主函数结束")
}

Goroutine 特点

特点 说明
轻量 初始栈 2KB,可动态伸缩
低开销 创建成本远低于线程
调度 由 Go 运行时调度
数量 可创建数百万个

6.2 Channel

Channel 是 goroutine 之间的通信机制。

package main

import "fmt"

func main() {
    // 创建 channel
    ch := make(chan int)

    // 带缓冲的 channel
    bufferedCh := make(chan int, 3)

    // 发送和接收
    go func() {
        ch <- 42  // 发送
    }()

    value := <-ch  // 接收
    fmt.Println(value)

    // 带缓冲 channel
    bufferedCh <- 1
    bufferedCh <- 2
    bufferedCh <- 3
    // bufferedCh <- 4  // 会阻塞

    fmt.Println(<-bufferedCh)
    fmt.Println(<-bufferedCh)

    // 关闭 channel
    close(ch)

    // 检查 channel 是否关闭
    v, ok := <-ch
    fmt.Println(v, ok)  // 0 false
}

遍历 Channel

package main

import "fmt"

func main() {
    ch := make(chan int, 5)

    // 发送数据
    go func() {
        for i := 1; i <= 5; i++ {
            ch <- i
        }
        close(ch)
    }()

    // 遍历 channel
    for v := range ch {
        fmt.Println(v)
    }
}

6.3 Select

Select 用于处理多个 channel 操作。

package main

import (
    "fmt"
    "time"
)

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)

    go func() {
        time.Sleep(100 * time.Millisecond)
        ch1 <- "来自 ch1"
    }()

    go func() {
        time.Sleep(200 * time.Millisecond)
        ch2 <- "来自 ch2"
    }()

    // select 多路复用
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println(msg1)
        case msg2 := <-ch2:
            fmt.Println(msg2)
        case <-time.After(300 * time.Millisecond):
            fmt.Println("超时")
        }
    }

    // 非阻塞 select
    select {
    case msg := <-ch1:
        fmt.Println(msg)
    default:
        fmt.Println("没有数据")
    }
}

6.4 并发模式

Worker Pool

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d 处理任务 %d\n", id, job)
        time.Sleep(100 * time.Millisecond)
        results <- job * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    // 启动 worker
    var wg sync.WaitGroup
    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(id, jobs, results)
        }(i)
    }

    // 发送任务
    for i := 1; i <= 10; i++ {
        jobs <- i
    }
    close(jobs)

    // 等待完成
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    for result := range results {
        fmt.Println("结果:", result)
    }
}

Pipeline

package main

import "fmt"

func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // 创建 pipeline
    nums := generator(1, 2, 3, 4, 5)
    squares := square(nums)

    // 消费结果
    for result := range squares {
        fmt.Println(result)
    }
}

Fan-out/Fan-in

package main

import (
    "fmt"
    "sync"
)

func producer(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func processor(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func merge(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }

    wg.Add(len(channels))
    for _, c := range channels {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    // Fan-out: 多个 goroutine 从同一 channel 读取
    in := producer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    c1 := processor(in)
    c2 := processor(in)
    c3 := processor(in)

    // Fan-in: 合并多个 channel
    for result := range merge(c1, c2, c3) {
        fmt.Println(result)
    }
}

6.5 同步原语

sync.WaitGroup

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Worker %d 完成\n", id)
        }(i)
    }

    wg.Wait()
    fmt.Println("所有 worker 完成")
}

sync.Mutex

package main

import (
    "fmt"
    "sync"
)

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := &Counter{}
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    wg.Wait()
    fmt.Println("计数器:", counter.Value())
}

sync.RWMutex

package main

import (
    "fmt"
    "sync"
)

type SafeMap struct {
    mu   sync.RWMutex
    data map[string]int
}

func (m *SafeMap) Get(key string) (int, bool) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    val, ok := m.data[key]
    return val, ok
}

func (m *SafeMap) Set(key string, value int) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.data[key] = value
}

func main() {
    m := &SafeMap{data: make(map[string]int)}
    var wg sync.WaitGroup

    // 写操作
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            m.Set(fmt.Sprintf("key%d", i), i)
        }(i)
    }

    // 读操作
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            if v, ok := m.Get(fmt.Sprintf("key%d", i)); ok {
                fmt.Println(v)
            }
        }(i)
    }

    wg.Wait()
}

小结

  1. Goroutine 是轻量级线程,创建成本低
  2. Channel 是 goroutine 之间的通信机制
  3. Select 用于多路复用多个 channel
  4. Worker Pool 是常用的并发模式
  5. sync 包提供了互斥锁等同步原语

参考资料